summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-05-13 16:00:23 +0300
committerGitHub <noreply@github.com>2019-05-13 16:00:23 +0300
commit18552f70b8552e0fb9c656eb262a4e8018af02ce (patch)
treefccf8d861e7e698eae5213ee0ba7c73bf1a27b0d
parent5c80bea709f4c89db3d8652f4a3e7d8421efb76e (diff)
parent4d24352033d6778a2d5b9c20546280cb59686490 (diff)
downloadrabbitmq-server-git-18552f70b8552e0fb9c656eb262a4e8018af02ce.tar.gz
Merge pull request #1893 from rabbitmq/do_not_confirm_on_unreachable_queue
Make sure that publishes to dead or unaccessible queues return nack
-rw-r--r--src/dtree.erl205
-rw-r--r--src/rabbit_channel.erl119
-rw-r--r--src/unconfirmed_messages.erl280
-rw-r--r--test/confirms_rejects_SUITE.erl118
4 files changed, 478 insertions, 244 deletions
diff --git a/src/dtree.erl b/src/dtree.erl
deleted file mode 100644
index 6f4d102c10..0000000000
--- a/src/dtree.erl
+++ /dev/null
@@ -1,205 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at https://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
-%%
-
-%% A dual-index tree.
-%%
-%% Entries have the following shape:
-%%
-%% +----+--------------------+---+
-%% | PK | SK1, SK2, ..., SKN | V |
-%% +----+--------------------+---+
-%%
-%% i.e. a primary key, set of secondary keys, and a value.
-%%
-%% There can be only one entry per primary key, but secondary keys may
-%% appear in multiple entries.
-%%
-%% The set of secondary keys must be non-empty. Or, to put it another
-%% way, entries only exist while their secondary key set is non-empty.
-
--module(dtree).
-
--export([empty/0, insert/4, take/3, take/2, take_one/2, take_all/2, drop/2,
- is_defined/2, is_empty/1, smallest/1, size/1]).
-
-%%----------------------------------------------------------------------------
-
--export_type([?MODULE/0]).
-
--opaque ?MODULE() :: {gb_trees:tree(), gb_trees:tree()}.
-
--type pk() :: any().
--type sk() :: any().
--type val() :: any().
--type kv() :: {pk(), val()}.
-
-%%----------------------------------------------------------------------------
-
--spec empty() -> ?MODULE().
-
-empty() -> {gb_trees:empty(), gb_trees:empty()}.
-
-%% Insert an entry. Fails if there already is an entry with the given
-%% primary key.
-
--spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE().
-
-insert(PK, [], V, {P, S}) ->
- %% dummy insert to force error if PK exists
- _ = gb_trees:insert(PK, {gb_sets:empty(), V}, P),
- {P, S};
-insert(PK, SKs, V, {P, S}) ->
- {gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P),
- lists:foldl(fun (SK, S0) ->
- case gb_trees:lookup(SK, S0) of
- {value, PKS} -> PKS1 = gb_sets:insert(PK, PKS),
- gb_trees:update(SK, PKS1, S0);
- none -> PKS = gb_sets:singleton(PK),
- gb_trees:insert(SK, PKS, S0)
- end
- end, S, SKs)}.
-
-%% Remove the given secondary key from the entries of the given
-%% primary keys, returning the primary-key/value pairs of any entries
-%% that were dropped as the result (i.e. due to their secondary key
-%% set becoming empty). It is ok for the given primary keys and/or
-%% secondary key to not exist.
-
--spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-
-take(PKs, SK, {P, S}) ->
- case gb_trees:lookup(SK, S) of
- none -> {[], {P, S}};
- {value, PKS} -> TakenPKS = gb_sets:from_list(PKs),
- PKSInter = gb_sets:intersection(PKS, TakenPKS),
- PKSDiff = gb_sets_difference (PKS, PKSInter),
- {KVs, P1} = take2(PKSInter, SK, P),
- {KVs, {P1, case gb_sets:is_empty(PKSDiff) of
- true -> gb_trees:delete(SK, S);
- false -> gb_trees:update(SK, PKSDiff, S)
- end}}
- end.
-
-%% Remove the given secondary key from all entries, returning the
-%% primary-key/value pairs of any entries that were dropped as the
-%% result (i.e. due to their secondary key set becoming empty). It is
-%% ok for the given secondary key to not exist.
-
--spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-
-take(SK, {P, S}) ->
- case gb_trees:lookup(SK, S) of
- none -> {[], {P, S}};
- {value, PKS} -> {KVs, P1} = take2(PKS, SK, P),
- {KVs, {P1, gb_trees:delete(SK, S)}}
- end.
-
-%% Drop an entry with the primary key and clears secondary keys for this key,
-%% returning a list with a key-value pair as a result.
-%% If the primary key does not exist, returns an empty list.
-
--spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}.
-
-take_one(PK, {P, S}) ->
- case gb_trees:lookup(PK, P) of
- {value, {SKS, Value}} ->
- P1 = gb_trees:delete(PK, P),
- S1 = gb_sets:fold(
- fun(SK, Acc) ->
- {value, PKS} = gb_trees:lookup(SK, Acc),
- PKS1 = gb_sets:delete(PK, PKS),
- case gb_sets:is_empty(PKS1) of
- true -> gb_trees:delete(SK, Acc);
- false -> gb_trees:update(SK, PKS1, Acc)
- end
- end, S, SKS),
- {[{PK, Value}], {P1, S1}};
- none -> {[], {P, S}}
- end.
-
-%% Drop all entries which contain the given secondary key, returning
-%% the primary-key/value pairs of these entries. It is ok for the
-%% given secondary key to not exist.
-
--spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-
-take_all(SK, {P, S}) ->
- case gb_trees:lookup(SK, S) of
- none -> {[], {P, S}};
- {value, PKS} -> {KVs, SKS, P1} = take_all2(PKS, P),
- {KVs, {P1, prune(SKS, PKS, S)}}
- end.
-
-%% Drop all entries for the given primary key (which does not have to exist).
-
--spec drop(pk(), ?MODULE()) -> ?MODULE().
-
-drop(PK, {P, S}) ->
- case gb_trees:lookup(PK, P) of
- none -> {P, S};
- {value, {SKS, _V}} -> {gb_trees:delete(PK, P),
- prune(SKS, gb_sets:singleton(PK), S)}
- end.
-
--spec is_defined(sk(), ?MODULE()) -> boolean().
-
-is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
-
--spec is_empty(?MODULE()) -> boolean().
-
-is_empty({P, _S}) -> gb_trees:is_empty(P).
-
--spec smallest(?MODULE()) -> kv().
-
-smallest({P, _S}) -> {K, {_SKS, V}} = gb_trees:smallest(P),
- {K, V}.
-
--spec size(?MODULE()) -> non_neg_integer().
-
-size({P, _S}) -> gb_trees:size(P).
-
-%%----------------------------------------------------------------------------
-
-take2(PKS, SK, P) ->
- gb_sets:fold(fun (PK, {KVs, P0}) ->
- {SKS, V} = gb_trees:get(PK, P0),
- SKS1 = gb_sets:delete(SK, SKS),
- case gb_sets:is_empty(SKS1) of
- true -> KVs1 = [{PK, V} | KVs],
- {KVs1, gb_trees:delete(PK, P0)};
- false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)}
- end
- end, {[], P}, PKS).
-
-take_all2(PKS, P) ->
- gb_sets:fold(fun (PK, {KVs, SKS0, P0}) ->
- {SKS, V} = gb_trees:get(PK, P0),
- {[{PK, V} | KVs], gb_sets:union(SKS, SKS0),
- gb_trees:delete(PK, P0)}
- end, {[], gb_sets:empty(), P}, PKS).
-
-prune(SKS, PKS, S) ->
- gb_sets:fold(fun (SK0, S0) ->
- PKS1 = gb_trees:get(SK0, S0),
- PKS2 = gb_sets_difference(PKS1, PKS),
- case gb_sets:is_empty(PKS2) of
- true -> gb_trees:delete(SK0, S0);
- false -> gb_trees:update(SK0, PKS2, S0)
- end
- end, S, SKS).
-
-gb_sets_difference(S1, S2) ->
- gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 19d4449e21..7d4b7bde4a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -155,7 +155,7 @@
confirm_enabled,
%% publisher confirm delivery tag sequence
publish_seqno,
- %% a dtree used to track unconfirmed
+ %% an unconfirmed_messages data structure used to track unconfirmed
%% (to publishers) messages
unconfirmed,
%% a list of tags for published messages that were
@@ -518,7 +518,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
delivering_queues = sets:new(),
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed = dtree:empty(),
+ unconfirmed = unconfirmed_messages:new(),
rejected = [],
confirmed = [],
reply_consumer = none,
@@ -707,10 +707,15 @@ handle_cast({mandatory_received, _MsgSeqNo}, State) ->
handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) ->
%% It does not matter which queue rejected the message,
- %% if any queue rejected it - it should not be confirmed.
- {MXs, UC1} = dtree:take_one(MsgSeqNo, UC),
+ %% if any queue did, it should not be confirmed.
+ {MaybeRejected, UC1} = unconfirmed_messages:reject_msg(MsgSeqNo, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
- noreply_coalesce(record_rejects(MXs, State#ch{unconfirmed = UC1}));
+ case MaybeRejected of
+ not_confirmed ->
+ noreply_coalesce(State#ch{unconfirmed = UC1});
+ {rejected, MX} ->
+ noreply_coalesce(record_rejects([MX], State#ch{unconfirmed = UC1}))
+ end;
handle_cast({confirm, MsgSeqNos, QPid}, State) ->
noreply_coalesce(confirm(MsgSeqNos, QPid, State)).
@@ -766,9 +771,12 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
eol ->
State1 = handle_consuming_queue_down_or_eol(Name, State0),
State2 = handle_delivering_queue_down(Name, State1),
- %% TODO: this should use dtree:take/3
- {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed),
- State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}),
+ {ConfirmMXs, RejectMXs, UC1} =
+ unconfirmed_messages:forget_ref(Name, State2#ch.unconfirmed),
+ %% Deleted queue is a special case.
+ %% Do not nack the "rejected" messages.
+ State3 = record_confirms(ConfirmMXs ++ RejectMXs,
+ State2#ch{unconfirmed = UC1}),
erase_queue_stats(QName),
noreply_coalesce(
State3#ch{queue_states = maps:remove(Name, QueueStates),
@@ -776,7 +784,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
end;
_ ->
%% the assumption here is that the queue state has been cleaned up and
- %% this is a residual ra notification
+ %% this is a residual Ra notification
noreply_coalesce(State0)
end;
@@ -1818,14 +1826,33 @@ track_delivering_queue(NoAck, QPid, QName,
false -> sets:add_element(QRef, DQ)
end}.
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC})
+handle_publishing_queue_down(QPid, Reason,
+ State = #ch{unconfirmed = UC,
+ queue_names = QNames})
when ?IS_CLASSIC(QPid) ->
- case rabbit_misc:is_abnormal_exit(Reason) of
- true -> {MXs, UC1} = dtree:take_all(QPid, UC),
- record_rejects(MXs, State#ch{unconfirmed = UC1});
- false -> {MXs, UC1} = dtree:take(QPid, UC),
- record_confirms(MXs, State#ch{unconfirmed = UC1})
-
+ case maps:get(QPid, QNames, none) of
+ %% The queue is unknown, the confirm must have been processed already
+ none -> State;
+ _QName ->
+ case {rabbit_misc:is_abnormal_exit(Reason), Reason} of
+ {true, _} ->
+ {RejectMXs, UC1} =
+ unconfirmed_messages:reject_all_for_queue(QPid, UC),
+ record_rejects(RejectMXs, State#ch{unconfirmed = UC1});
+ {false, normal} ->
+ {ConfirmMXs, RejectMXs, UC1} =
+ unconfirmed_messages:forget_ref(QPid, UC),
+ %% Deleted queue is a special case.
+ %% Do not nack the "rejected" messages.
+ record_confirms(ConfirmMXs ++ RejectMXs,
+ State#ch{unconfirmed = UC1});
+ {false, _} ->
+ {ConfirmMXs, RejectMXs, UC1} =
+ unconfirmed_messages:forget_ref(QPid, UC),
+ State1 = record_confirms(ConfirmMXs,
+ State#ch{unconfirmed = UC1}),
+ record_rejects(RejectMXs, State1)
+ end
end;
handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
error(quorum_queues_should_never_be_monitored).
@@ -2139,21 +2166,33 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
%% frequently would in fact be more expensive in the common case.
{QNames1, QMons1} =
lists:foldl(fun (Q, {QNames0, QMons0}) when ?is_amqqueue(Q) ->
- QPid = amqqueue:get_pid(Q),
- QRef = qpid_to_ref(QPid),
- QName = amqqueue:get_name(Q),
- {case maps:is_key(QRef, QNames0) of
- true -> QNames0;
- false -> maps:put(QRef, QName, QNames0)
- end, maybe_monitor(QPid, QMons0)}
- end, {QNames, maybe_monitor_all(DeliveredQPids, QMons)}, Qs),
+ QPid = amqqueue:get_pid(Q),
+ QRef = qpid_to_ref(QPid),
+ QName = amqqueue:get_name(Q),
+ case ?IS_CLASSIC(QRef) of
+ true ->
+ SPids = amqqueue:get_slave_pids(Q),
+ NewQNames =
+ maps:from_list([{Ref, QName} || Ref <- [QRef | SPids]]),
+ {maps:merge(NewQNames, QNames0),
+ maybe_monitor_all([QPid | SPids], QMons0)};
+ false ->
+ {maps:put(QRef, QName, QNames0), QMons0}
+ end
+ end,
+ {QNames, QMons}, Qs),
State1 = State#ch{queue_names = QNames1,
queue_monitors = QMons1},
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, AllDeliveredQRefs,
Message, State1),
- State2 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo,
+ AllDeliveredQNames = [ QName || QRef <- AllDeliveredQRefs,
+ {ok, QName} <- [maps:find(QRef, QNames1)]],
+ State2 = process_routing_confirm(Confirm,
+ AllDeliveredQRefs,
+ AllDeliveredQNames,
+ MsgSeqNo,
XName, State1),
case rabbit_event:stats_level(State, #ch.stats_timer) of
fine ->
@@ -2179,16 +2218,22 @@ process_routing_mandatory(_Mandatory = false,
process_routing_mandatory(_, _, _, _) ->
ok.
-process_routing_confirm(false, _, _, _, State) ->
+process_routing_confirm(false, _, _, _, _, State) ->
State;
-process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
+process_routing_confirm(true, [], _, MsgSeqNo, XName, State) ->
record_confirms([{MsgSeqNo, XName}], State);
-process_routing_confirm(true, QRefs, MsgSeqNo, XName, State) ->
- State#ch{unconfirmed = dtree:insert(MsgSeqNo, QRefs, XName,
- State#ch.unconfirmed)}.
-
-confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} = dtree:take(MsgSeqNos, QRef, UC),
+process_routing_confirm(true, QRefs, QNames, MsgSeqNo, XName, State) ->
+ State#ch{unconfirmed =
+ unconfirmed_messages:insert(MsgSeqNo, QNames, QRefs, XName,
+ State#ch.unconfirmed)}.
+
+confirm(MsgSeqNos, QRef, State = #ch{queue_names = QNames, unconfirmed = UC}) ->
+ %% NOTE: if queue name does not exist here it's likely that the ref also
+ %% does not exist in unconfirmed messages.
+ %% Neither does the 'ignore' atom, so it's a reasonable fallback.
+ QName = maps:get(QRef, QNames, ignore),
+ {MXs, UC1} =
+ unconfirmed_messages:confirm_multiple_msg_ref(MsgSeqNos, QName, QRef, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
record_confirms(MXs, State#ch{unconfirmed = UC1}).
@@ -2250,9 +2295,9 @@ send_confirms(Cs, Rs, State) ->
coalesce_and_send(MsgSeqNos, NegativeMsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- UnconfirmedCutoff = case dtree:is_empty(UC) of
+ UnconfirmedCutoff = case unconfirmed_messages:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
- false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo
+ false -> unconfirmed_messages:smallest(UC)
end,
Cutoff = lists:min([UnconfirmedCutoff | NegativeMsgSeqNos]),
{Ms, Ss} = lists:splitwith(fun(X) -> X < Cutoff end, SMsgSeqNos),
@@ -2271,7 +2316,7 @@ ack_len(Acks) -> lists:sum([length(L) || {ack, L} <- Acks]).
maybe_complete_tx(State = #ch{tx = {_, _}}) ->
State;
maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
- case dtree:is_empty(UC) of
+ case unconfirmed_messages:is_empty(UC) of
false -> State;
true -> complete_tx(State#ch{confirmed = []})
end.
@@ -2311,7 +2356,7 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(source, #ch{cfg = #conf{source = ChSrc}}) -> ChSrc;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) -> unconfirmed_messages:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> ?QUEUE:len(UAMQ);
i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
diff --git a/src/unconfirmed_messages.erl b/src/unconfirmed_messages.erl
new file mode 100644
index 0000000000..63a504a239
--- /dev/null
+++ b/src/unconfirmed_messages.erl
@@ -0,0 +1,280 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+%% Unconfirmed messages tracking.
+%%
+%% A message should be confirmed to the publisher only when all queues confirm.
+%%
+%% Messages are published to multiple queues while each queue may be
+%% represented by several processes (queue refs).
+%%
+%% Queue refs return confirmations, rejections, can fail or disconnect.
+%% If a queue ref fails, messgae should be rejected.
+%% If all queue refs for a queue disconnect (not fail) without confirmation,
+%% messge should be rejected.
+%%
+%% For simplicity, disconnects do not return a reject until all message refs
+%% confirm or disconnect.
+
+-module(unconfirmed_messages).
+
+-export([new/0,
+ insert/5,
+ confirm_msg_ref/4,
+ confirm_multiple_msg_ref/4,
+ forget_ref/2,
+
+ reject_msg/2,
+ reject_all_for_queue/2,
+
+ smallest/1,
+ size/1,
+ is_empty/1]).
+
+%%----------------------------------------------------------------------------
+
+-export_type([?MODULE/0]).
+-define(SET_VALUE, []).
+
+-type queue_ref() :: term().
+-type msg_id() :: term().
+-type queue_name() :: rabbit_amqqueue:name().
+-type exchange_name() :: rabbit_exchange:name().
+-type map_set(Type) :: #{Type => ?SET_VALUE}.
+
+-record(msg_status, {
+ %% a set of refs waiting for confirm
+ refs = #{} :: map_set(queue_ref()),
+ %% shows which queues had at least one confirmation
+ queue_status = #{} :: #{queue_name() => confirmed | rejected},
+ exchange :: exchange_name()
+}).
+
+-record(unconfirmed, {
+ %% needed to get unconfirmed cutoff
+ ordered = gb_sets:new() :: gb_sets:set(msg_id()),
+ %% contains message statuses of all message IDs
+ index = #{} :: #{msg_id() => #msg_status{}},
+ %% needed to look up message IDs for a queue ref
+ reverse = #{} :: #{queue_ref() => #{msg_id() => ?SET_VALUE}}
+}).
+
+-opaque ?MODULE() :: #unconfirmed{}.
+
+%%----------------------------------------------------------------------------
+
+-spec new() -> ?MODULE().
+new() -> #unconfirmed{}.
+
+%% Insert an entry for the message ID. Fails if there already is
+%% an entry with the given ID.
+-spec insert(msg_id(), [queue_name()], [queue_ref()], exchange_name(), ?MODULE()) -> ?MODULE().
+insert(MsgId, QueueNames, QueueRefs, XName,
+ #unconfirmed{ordered = Ordered,
+ index = Index,
+ reverse = Reverse} = UC) ->
+ case maps:get(MsgId, Index, none) of
+ none ->
+ UC#unconfirmed{
+ ordered = gb_sets:add(MsgId, Ordered),
+ index =
+ Index#{MsgId =>
+ #msg_status{
+ refs = maps:from_list([{QR, ?SET_VALUE} || QR <- QueueRefs]),
+ queue_status = maps:from_list([{QN, rejected} || QN <- QueueNames]),
+ exchange = XName}},
+ reverse = lists:foldl(
+ fun
+ (Ref, R) ->
+ case R of
+ #{Ref := MsgIdsSet} ->
+ R#{Ref => MsgIdsSet#{MsgId => ?SET_VALUE}};
+ _ ->
+ R#{Ref => #{MsgId => ?SET_VALUE}}
+ end
+ end,
+ Reverse, QueueRefs)
+ };
+ _ ->
+ error({message_already_exists, MsgId, QueueNames, QueueRefs, XName, UC})
+ end.
+
+%% Confirms a message on behalf of the given queue. If it was the last queue (ref)
+%% on the waiting list, returns 'confirmed' and performs the necessary cleanup.
+-spec confirm_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
+ {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
+confirm_msg_ref(MsgId, QueueName, QueueRef,
+ #unconfirmed{reverse = Reverse} = UC) ->
+ remove_msg_ref(confirm, MsgId, QueueName, QueueRef,
+ UC#unconfirmed{reverse = remove_from_reverse(QueueRef, [MsgId], Reverse)}).
+
+-spec confirm_multiple_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
+ {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
+confirm_multiple_msg_ref(MsgIds, QueueName, QueueRef,
+ #unconfirmed{reverse = Reverse} = UC0) ->
+ lists:foldl(
+ fun(MsgId, {C, UC}) ->
+ case remove_msg_ref(confirm, MsgId, QueueName, QueueRef, UC) of
+ {{confirmed, V}, UC1} -> {[V | C], UC1};
+ {not_confirmed, UC1} -> {C, UC1}
+ end
+ end,
+ {[], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}},
+ MsgIds).
+
+%% Removes all messages for a queue.
+%% Returns lists of confirmed and rejected messages.
+%%
+%% If there are no more refs left for the message, either
+%% 'confirmed' or 'rejected'.
+%% 'confirmed' is returned if all queues have confirmed the message.
+-spec forget_ref(queue_ref(), ?MODULE()) ->
+ {Confirmed :: [{msg_id(), exchange_name()}],
+ Rejected :: [{msg_id(), exchange_name()}],
+ ?MODULE()}.
+forget_ref(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) ->
+ MsgIds = maps:keys(maps:get(QueueRef, Reverse0, #{})),
+ lists:foldl(fun(MsgId, {C, R, UC}) ->
+ case remove_msg_ref(no_confirm, MsgId, ignore, QueueRef, UC) of
+ {not_confirmed, UC1} -> {C, R, UC1};
+ {{confirmed, V}, UC1} -> {[V | C], R, UC1};
+ {{rejected, V}, UC1} -> {C, [V | R], UC1}
+ end
+ end,
+ {[], [], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}},
+ MsgIds).
+
+%% Rejects a single message with the given ID.
+%% Returns 'rejected' if there was a message with
+%% such ID.
+-spec reject_msg(msg_id(), ?MODULE()) ->
+ {{rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
+reject_msg(MsgId, #unconfirmed{ordered = Ordered, index = Index, reverse = Reverse} = UC) ->
+ case maps:get(MsgId, Index, none) of
+ none ->
+ {not_confirmed, UC};
+ #msg_status{exchange = XName,
+ refs = Refs} ->
+ {{rejected, {MsgId, XName}},
+ UC#unconfirmed{ordered = gb_sets:del_element(MsgId, Ordered),
+ index = maps:remove(MsgId, Index),
+ reverse = remove_multiple_from_reverse(maps:keys(Refs), [MsgId], Reverse)}}
+ end.
+
+%% Rejects all pending messages for a queue.
+-spec reject_all_for_queue(queue_ref(), ?MODULE()) ->
+ {Rejected :: [{msg_id(), exchange_name()}], ?MODULE()}.
+reject_all_for_queue(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) ->
+ MsgIds = maps:keys(maps:get(QueueRef, Reverse0, #{})),
+ lists:foldl(fun(MsgId, {R, UC}) ->
+ case reject_msg(MsgId, UC) of
+ {not_confirmed, UC1} -> {R, UC1};
+ {{rejected, V}, UC1} -> {[V | R], UC1}
+ end
+ end,
+ {[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}},
+ MsgIds).
+
+%% Returns a smallest message id.
+-spec smallest(?MODULE()) -> msg_id().
+smallest(#unconfirmed{ordered = Ordered}) ->
+ gb_sets:smallest(Ordered).
+
+-spec size(?MODULE()) -> msg_id().
+size(#unconfirmed{index = Index}) -> maps:size(Index).
+
+-spec is_empty(?MODULE()) -> boolean().
+is_empty(#unconfirmed{index = Index, reverse = Reverse, ordered = Ordered} = UC) ->
+ case maps:size(Index) == 0 of
+ true ->
+ %% Assertion
+ case maps:size(Reverse) == gb_sets:size(Ordered)
+ andalso
+ maps:size(Reverse) == 0 of
+ true -> ok;
+ false -> error({size_mismatch, UC})
+ end,
+ true;
+ _ ->
+ false
+ end.
+
+-spec remove_from_reverse(queue_ref(), [msg_id()],
+ #{queue_ref() => #{msg_id() => ?SET_VALUE}}) ->
+ #{queue_ref() => #{msg_id() => ?SET_VALUE}}.
+remove_from_reverse(QueueRef, MsgIds, Reverse) when is_list(MsgIds) ->
+ case maps:get(QueueRef, Reverse, none) of
+ none ->
+ Reverse;
+ MsgIdsSet ->
+ NewMsgIdsSet = maps:without(MsgIds, MsgIdsSet),
+ case maps:size(NewMsgIdsSet) > 0 of
+ true -> Reverse#{QueueRef => NewMsgIdsSet};
+ false -> maps:remove(QueueRef, Reverse)
+ end
+ end.
+
+-spec remove_multiple_from_reverse([queue_ref()], [msg_id()],
+ #{queue_ref() => #{msg_id() => ?SET_VALUE}}) ->
+ #{queue_ref() => #{msg_id() => ?SET_VALUE}}.
+remove_multiple_from_reverse(Refs, MsgIds, Reverse0) ->
+ lists:foldl(
+ fun(Ref, Reverse) ->
+ remove_from_reverse(Ref, MsgIds, Reverse)
+ end,
+ Reverse0,
+ Refs).
+
+-spec remove_msg_ref(confirm | no_confirm, msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
+ {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed,
+ ?MODULE()}.
+remove_msg_ref(Confirm, MsgId, QueueName, QueueRef,
+ #unconfirmed{ordered = Ordered, index = Index} = UC) ->
+ case maps:get(MsgId, Index, none) of
+ none ->
+ {not_confirmed, UC};
+ #msg_status{refs = #{QueueRef := ?SET_VALUE} = Refs,
+ queue_status = QStatus,
+ exchange = XName} = MsgStatus ->
+ QStatus1 = case {Confirm, QueueName} of
+ {no_confirm, _} -> QStatus;
+ {_, ignore} -> QStatus;
+ {confirm, _} -> QStatus#{QueueName => confirmed}
+ end,
+ case maps:size(Refs) == 1 of
+ true ->
+ {{confirm_status(QStatus1), {MsgId, XName}},
+ UC#unconfirmed{
+ ordered = gb_sets:del_element(MsgId, Ordered),
+ index = maps:remove(MsgId, Index)}};
+ false ->
+ {not_confirmed,
+ UC#unconfirmed{
+ index = Index#{MsgId =>
+ MsgStatus#msg_status{
+ refs = maps:remove(QueueRef, Refs),
+ queue_status = QStatus1}}}}
+ end;
+ _ -> {not_confirmed, UC}
+ end.
+
+-spec confirm_status(#{queue_name() => confirmed | rejected}) -> confirmed | rejected.
+confirm_status(QueueStatus) ->
+ case lists:all(fun(confirmed) -> true; (_) -> false end,
+ maps:values(QueueStatus)) of
+ true -> confirmed;
+ false -> rejected
+ end.
diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl
index 722b5240fc..8805b71ffe 100644
--- a/test/confirms_rejects_SUITE.erl
+++ b/test/confirms_rejects_SUITE.erl
@@ -14,7 +14,9 @@ groups() ->
[
{parallel_tests, [parallel], [
confirms_rejects_conflict,
- policy_resets_to_default
+ policy_resets_to_default,
+ dead_queue_rejects,
+ mixed_dead_alive_queues_reject
]}
].
@@ -45,7 +47,10 @@ init_per_testcase(policy_resets_to_default = Testcase, Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
rabbit_ct_helpers:testcase_started(
rabbit_ct_helpers:set_config(Config, [{conn, Conn}]), Testcase);
-init_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
+init_per_testcase(Testcase, Config)
+ when Testcase == confirms_rejects_conflict;
+ Testcase == dead_queue_rejects;
+ Testcase == mixed_dead_alive_queues_reject ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
@@ -66,6 +71,19 @@ end_per_testcase(policy_resets_to_default = Testcase, Config) ->
end_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
amqp_channel:call(Ch, #'queue.delete'{queue = <<"confirms_rejects_conflict">>}),
+ end_per_testcase0(Testcase, Config);
+end_per_testcase(dead_queue_rejects = Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"dead_queue_rejects">>}),
+ end_per_testcase0(Testcase, Config);
+end_per_testcase(mixed_dead_alive_queues_reject = Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_dead">>}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_alive">>}),
+ amqp_channel:call(Ch, #'exchange.delete'{exchange = <<"mixed_dead_alive_queues_reject">>}),
+ end_per_testcase0(Testcase, Config).
+
+end_per_testcase0(Testcase, Config) ->
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
Conn = ?config(conn, Config),
@@ -74,9 +92,89 @@ end_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
rabbit_ct_client_helpers:close_connection(Conn),
rabbit_ct_client_helpers:close_connection(Conn1),
+ clean_acks_mailbox(),
+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
+dead_queue_rejects(Config) ->
+ Conn = ?config(conn, Config),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ QueueName = <<"dead_queue_rejects">>,
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
+ amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
+ durable = true}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
+ #amqp_msg{payload = <<"HI">>}),
+
+ receive
+ {'basic.ack',_,_} -> ok
+ after 10000 ->
+ error(timeout_waiting_for_initial_ack)
+ end,
+
+ kill_the_queue(QueueName, Config),
+
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
+ #amqp_msg{payload = <<"HI">>}),
+
+ receive
+ {'basic.ack',_,_} -> error(expecting_nack_got_ack);
+ {'basic.nack',_,_,_} -> ok
+ after 10000 ->
+ error(timeout_waiting_for_nack)
+ end.
+
+mixed_dead_alive_queues_reject(Config) ->
+ Conn = ?config(conn, Config),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ QueueNameDead = <<"mixed_dead_alive_queues_reject_dead">>,
+ QueueNameAlive = <<"mixed_dead_alive_queues_reject_alive">>,
+ ExchangeName = <<"mixed_dead_alive_queues_reject">>,
+
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
+ amqp_channel:call(Ch, #'queue.declare'{queue = QueueNameDead,
+ durable = true}),
+ amqp_channel:call(Ch, #'queue.declare'{queue = QueueNameAlive,
+ durable = true}),
+
+ amqp_channel:call(Ch, #'exchange.declare'{exchange = ExchangeName,
+ durable = true}),
+
+ amqp_channel:call(Ch, #'queue.bind'{exchange = ExchangeName,
+ queue = QueueNameAlive,
+ routing_key = <<"route">>}),
+
+ amqp_channel:call(Ch, #'queue.bind'{exchange = ExchangeName,
+ queue = QueueNameDead,
+ routing_key = <<"route">>}),
+
+ amqp_channel:call(Ch, #'basic.publish'{exchange = ExchangeName,
+ routing_key = <<"route">>},
+ #amqp_msg{payload = <<"HI">>}),
+
+ receive
+ {'basic.ack',_,_} -> ok
+ after 10000 ->
+ error(timeout_waiting_for_initial_ack)
+ end,
+
+ kill_the_queue(QueueNameDead, Config),
+
+ amqp_channel:call(Ch, #'basic.publish'{exchange = ExchangeName,
+ routing_key = <<"route">>},
+ #amqp_msg{payload = <<"HI">>}),
+
+ receive
+ {'basic.nack',_,_,_} -> ok;
+ {'basic.ack',_,_} -> error(expecting_nack_got_ack)
+ after 10000 ->
+ error(timeout_waiting_for_ack)
+ end.
confirms_rejects_conflict(Config) ->
Conn = ?config(conn, Config),
@@ -256,3 +354,19 @@ clean_acks_mailbox() ->
after
1000 -> done
end.
+
+kill_the_queue(QueueName, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, kill_the_queue, [QueueName]).
+
+kill_the_queue(QueueName) ->
+ [begin
+ {ok, Q} = rabbit_amqqueue:lookup({resource, <<"/">>, queue, QueueName}),
+ Pid = amqqueue:get_pid(Q),
+ exit(Pid, kill)
+ end
+ || _ <- lists:seq(1, 11)].
+
+
+
+
+