summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2019-04-03 18:10:20 -0400
committerDaniil Fedotov <hairyhum@gmail.com>2019-05-10 17:42:40 -0400
commitb7f29c1dad72eefbcafe2a7b7475777c83db53f9 (patch)
tree996e4cf58b7a5575c6ca5ef2f308e1d64a2237b2 /src
parent5c80bea709f4c89db3d8652f4a3e7d8421efb76e (diff)
downloadrabbitmq-server-git-b7f29c1dad72eefbcafe2a7b7475777c83db53f9.tar.gz
Change publisher confirms behaviour to reject messages if no queues confirmed.
Channel is counting unacked messages in a remove-only data structure `dtree`. Each published message id is associated with a list of queues where it was routed to. On confirm of queue failures queues were removed from the list As soon as there are no queues in the list - the message can be confirmed. This meant that if all queues fail with "not abnormal" reasons - the message may be confirmed, but not enqueued. This change removes dtree data structure, replacing it with specific unconfirmed_messages data structure. It tracks queue pids similarly to dtree, but also has an API to record confirms and failures differently, keeping track of which queues received at least one confirm. If all pids fails or confirm, but not all queues received confirmation - it means not all queues enqueued the message and the message should be rejected This is different from the current behaviour, but corresponds to the docs and common sense. [#163952410]
Diffstat (limited to 'src')
-rw-r--r--src/dtree.erl205
-rw-r--r--src/rabbit_channel.erl115
-rw-r--r--src/unconfirmed_messages.erl292
3 files changed, 372 insertions, 240 deletions
diff --git a/src/dtree.erl b/src/dtree.erl
deleted file mode 100644
index 6f4d102c10..0000000000
--- a/src/dtree.erl
+++ /dev/null
@@ -1,205 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at https://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
-%%
-
-%% A dual-index tree.
-%%
-%% Entries have the following shape:
-%%
-%% +----+--------------------+---+
-%% | PK | SK1, SK2, ..., SKN | V |
-%% +----+--------------------+---+
-%%
-%% i.e. a primary key, set of secondary keys, and a value.
-%%
-%% There can be only one entry per primary key, but secondary keys may
-%% appear in multiple entries.
-%%
-%% The set of secondary keys must be non-empty. Or, to put it another
-%% way, entries only exist while their secondary key set is non-empty.
-
--module(dtree).
-
--export([empty/0, insert/4, take/3, take/2, take_one/2, take_all/2, drop/2,
- is_defined/2, is_empty/1, smallest/1, size/1]).
-
-%%----------------------------------------------------------------------------
-
--export_type([?MODULE/0]).
-
--opaque ?MODULE() :: {gb_trees:tree(), gb_trees:tree()}.
-
--type pk() :: any().
--type sk() :: any().
--type val() :: any().
--type kv() :: {pk(), val()}.
-
-%%----------------------------------------------------------------------------
-
--spec empty() -> ?MODULE().
-
-empty() -> {gb_trees:empty(), gb_trees:empty()}.
-
-%% Insert an entry. Fails if there already is an entry with the given
-%% primary key.
-
--spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE().
-
-insert(PK, [], V, {P, S}) ->
- %% dummy insert to force error if PK exists
- _ = gb_trees:insert(PK, {gb_sets:empty(), V}, P),
- {P, S};
-insert(PK, SKs, V, {P, S}) ->
- {gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P),
- lists:foldl(fun (SK, S0) ->
- case gb_trees:lookup(SK, S0) of
- {value, PKS} -> PKS1 = gb_sets:insert(PK, PKS),
- gb_trees:update(SK, PKS1, S0);
- none -> PKS = gb_sets:singleton(PK),
- gb_trees:insert(SK, PKS, S0)
- end
- end, S, SKs)}.
-
-%% Remove the given secondary key from the entries of the given
-%% primary keys, returning the primary-key/value pairs of any entries
-%% that were dropped as the result (i.e. due to their secondary key
-%% set becoming empty). It is ok for the given primary keys and/or
-%% secondary key to not exist.
-
--spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-
-take(PKs, SK, {P, S}) ->
- case gb_trees:lookup(SK, S) of
- none -> {[], {P, S}};
- {value, PKS} -> TakenPKS = gb_sets:from_list(PKs),
- PKSInter = gb_sets:intersection(PKS, TakenPKS),
- PKSDiff = gb_sets_difference (PKS, PKSInter),
- {KVs, P1} = take2(PKSInter, SK, P),
- {KVs, {P1, case gb_sets:is_empty(PKSDiff) of
- true -> gb_trees:delete(SK, S);
- false -> gb_trees:update(SK, PKSDiff, S)
- end}}
- end.
-
-%% Remove the given secondary key from all entries, returning the
-%% primary-key/value pairs of any entries that were dropped as the
-%% result (i.e. due to their secondary key set becoming empty). It is
-%% ok for the given secondary key to not exist.
-
--spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-
-take(SK, {P, S}) ->
- case gb_trees:lookup(SK, S) of
- none -> {[], {P, S}};
- {value, PKS} -> {KVs, P1} = take2(PKS, SK, P),
- {KVs, {P1, gb_trees:delete(SK, S)}}
- end.
-
-%% Drop an entry with the primary key and clears secondary keys for this key,
-%% returning a list with a key-value pair as a result.
-%% If the primary key does not exist, returns an empty list.
-
--spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}.
-
-take_one(PK, {P, S}) ->
- case gb_trees:lookup(PK, P) of
- {value, {SKS, Value}} ->
- P1 = gb_trees:delete(PK, P),
- S1 = gb_sets:fold(
- fun(SK, Acc) ->
- {value, PKS} = gb_trees:lookup(SK, Acc),
- PKS1 = gb_sets:delete(PK, PKS),
- case gb_sets:is_empty(PKS1) of
- true -> gb_trees:delete(SK, Acc);
- false -> gb_trees:update(SK, PKS1, Acc)
- end
- end, S, SKS),
- {[{PK, Value}], {P1, S1}};
- none -> {[], {P, S}}
- end.
-
-%% Drop all entries which contain the given secondary key, returning
-%% the primary-key/value pairs of these entries. It is ok for the
-%% given secondary key to not exist.
-
--spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-
-take_all(SK, {P, S}) ->
- case gb_trees:lookup(SK, S) of
- none -> {[], {P, S}};
- {value, PKS} -> {KVs, SKS, P1} = take_all2(PKS, P),
- {KVs, {P1, prune(SKS, PKS, S)}}
- end.
-
-%% Drop all entries for the given primary key (which does not have to exist).
-
--spec drop(pk(), ?MODULE()) -> ?MODULE().
-
-drop(PK, {P, S}) ->
- case gb_trees:lookup(PK, P) of
- none -> {P, S};
- {value, {SKS, _V}} -> {gb_trees:delete(PK, P),
- prune(SKS, gb_sets:singleton(PK), S)}
- end.
-
--spec is_defined(sk(), ?MODULE()) -> boolean().
-
-is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
-
--spec is_empty(?MODULE()) -> boolean().
-
-is_empty({P, _S}) -> gb_trees:is_empty(P).
-
--spec smallest(?MODULE()) -> kv().
-
-smallest({P, _S}) -> {K, {_SKS, V}} = gb_trees:smallest(P),
- {K, V}.
-
--spec size(?MODULE()) -> non_neg_integer().
-
-size({P, _S}) -> gb_trees:size(P).
-
-%%----------------------------------------------------------------------------
-
-take2(PKS, SK, P) ->
- gb_sets:fold(fun (PK, {KVs, P0}) ->
- {SKS, V} = gb_trees:get(PK, P0),
- SKS1 = gb_sets:delete(SK, SKS),
- case gb_sets:is_empty(SKS1) of
- true -> KVs1 = [{PK, V} | KVs],
- {KVs1, gb_trees:delete(PK, P0)};
- false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)}
- end
- end, {[], P}, PKS).
-
-take_all2(PKS, P) ->
- gb_sets:fold(fun (PK, {KVs, SKS0, P0}) ->
- {SKS, V} = gb_trees:get(PK, P0),
- {[{PK, V} | KVs], gb_sets:union(SKS, SKS0),
- gb_trees:delete(PK, P0)}
- end, {[], gb_sets:empty(), P}, PKS).
-
-prune(SKS, PKS, S) ->
- gb_sets:fold(fun (SK0, S0) ->
- PKS1 = gb_trees:get(SK0, S0),
- PKS2 = gb_sets_difference(PKS1, PKS),
- case gb_sets:is_empty(PKS2) of
- true -> gb_trees:delete(SK0, S0);
- false -> gb_trees:update(SK0, PKS2, S0)
- end
- end, S, SKS).
-
-gb_sets_difference(S1, S2) ->
- gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 19d4449e21..ddd36eb4ec 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -155,7 +155,7 @@
confirm_enabled,
%% publisher confirm delivery tag sequence
publish_seqno,
- %% a dtree used to track unconfirmed
+ %% an unconfirmed_messages data structure used to track unconfirmed
%% (to publishers) messages
unconfirmed,
%% a list of tags for published messages that were
@@ -518,7 +518,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
delivering_queues = sets:new(),
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed = dtree:empty(),
+ unconfirmed = unconfirmed_messages:new(),
rejected = [],
confirmed = [],
reply_consumer = none,
@@ -708,9 +708,14 @@ handle_cast({mandatory_received, _MsgSeqNo}, State) ->
handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) ->
%% It does not matter which queue rejected the message,
%% if any queue rejected it - it should not be confirmed.
- {MXs, UC1} = dtree:take_one(MsgSeqNo, UC),
+ {MaybeRejected, UC1} = unconfirmed_messages:reject_msg(MsgSeqNo, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
- noreply_coalesce(record_rejects(MXs, State#ch{unconfirmed = UC1}));
+ case MaybeRejected of
+ not_confirmed ->
+ noreply_coalesce(State#ch{unconfirmed = UC1});
+ {rejected, MX} ->
+ noreply_coalesce(record_rejects([MX], State#ch{unconfirmed = UC1}))
+ end;
handle_cast({confirm, MsgSeqNos, QPid}, State) ->
noreply_coalesce(confirm(MsgSeqNos, QPid, State)).
@@ -766,9 +771,12 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
eol ->
State1 = handle_consuming_queue_down_or_eol(Name, State0),
State2 = handle_delivering_queue_down(Name, State1),
- %% TODO: this should use dtree:take/3
- {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed),
- State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}),
+ {ConfirmMXs, RejectMXs, UC1} =
+ unconfirmed_messages:forget_ref(Name, State2#ch.unconfirmed),
+ %% Deleted queue is a special case.
+ %% Do not nack rejected messages.
+ State3 = record_confirms(ConfirmMXs ++ RejectMXs,
+ State2#ch{unconfirmed = UC1}),
erase_queue_stats(QName),
noreply_coalesce(
State3#ch{queue_states = maps:remove(Name, QueueStates),
@@ -1818,14 +1826,33 @@ track_delivering_queue(NoAck, QPid, QName,
false -> sets:add_element(QRef, DQ)
end}.
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC})
+handle_publishing_queue_down(QPid, Reason,
+ State = #ch{unconfirmed = UC,
+ queue_names = QNames})
when ?IS_CLASSIC(QPid) ->
- case rabbit_misc:is_abnormal_exit(Reason) of
- true -> {MXs, UC1} = dtree:take_all(QPid, UC),
- record_rejects(MXs, State#ch{unconfirmed = UC1});
- false -> {MXs, UC1} = dtree:take(QPid, UC),
- record_confirms(MXs, State#ch{unconfirmed = UC1})
-
+ case maps:get(QPid, QNames, none) of
+ %% The queue is unknown, the confirm must have been processed already
+ none -> State;
+ _QName ->
+ case {rabbit_misc:is_abnormal_exit(Reason), Reason} of
+ {true, _} ->
+ {RejectMXs, UC1} =
+ unconfirmed_messages:reject_all_for_queue(QPid, UC),
+ record_rejects(RejectMXs, State#ch{unconfirmed = UC1});
+ {false, normal} ->
+ {ConfirmMXs, RejectMXs, UC1} =
+ unconfirmed_messages:forget_ref(QPid, UC),
+ %% Deleted queue is a special case.
+ %% Do not nack rejected messages.
+ record_confirms(ConfirmMXs ++ RejectMXs,
+ State#ch{unconfirmed = UC1});
+ {false, _} ->
+ {ConfirmMXs, RejectMXs, UC1} =
+ unconfirmed_messages:forget_ref(QPid, UC),
+ State1 = record_confirms(ConfirmMXs,
+ State#ch{unconfirmed = UC1}),
+ record_rejects(RejectMXs, State1)
+ end
end;
handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
error(quorum_queues_should_never_be_monitored).
@@ -2139,21 +2166,33 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
%% frequently would in fact be more expensive in the common case.
{QNames1, QMons1} =
lists:foldl(fun (Q, {QNames0, QMons0}) when ?is_amqqueue(Q) ->
- QPid = amqqueue:get_pid(Q),
- QRef = qpid_to_ref(QPid),
- QName = amqqueue:get_name(Q),
- {case maps:is_key(QRef, QNames0) of
- true -> QNames0;
- false -> maps:put(QRef, QName, QNames0)
- end, maybe_monitor(QPid, QMons0)}
- end, {QNames, maybe_monitor_all(DeliveredQPids, QMons)}, Qs),
+ QPid = amqqueue:get_pid(Q),
+ QRef = qpid_to_ref(QPid),
+ QName = amqqueue:get_name(Q),
+ case ?IS_CLASSIC(QRef) of
+ true ->
+ SPids = amqqueue:get_slave_pids(Q),
+ NewQNames =
+ maps:from_list([{Ref, QName} || Ref <- [QRef | SPids]]),
+ {maps:merge(NewQNames, QNames0),
+ maybe_monitor_all([QPid | SPids], QMons0)};
+ false ->
+ {maps:put(QRef, QName, QNames0), QMons0}
+ end
+ end,
+ {QNames, QMons}, Qs),
State1 = State#ch{queue_names = QNames1,
queue_monitors = QMons1},
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, AllDeliveredQRefs,
Message, State1),
- State2 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo,
+ AllDeliveredQNames = [ QName || QRef <- AllDeliveredQRefs,
+ {ok, QName} <- [maps:find(QRef, QNames1)]],
+ State2 = process_routing_confirm(Confirm,
+ AllDeliveredQRefs,
+ AllDeliveredQNames,
+ MsgSeqNo,
XName, State1),
case rabbit_event:stats_level(State, #ch.stats_timer) of
fine ->
@@ -2179,16 +2218,22 @@ process_routing_mandatory(_Mandatory = false,
process_routing_mandatory(_, _, _, _) ->
ok.
-process_routing_confirm(false, _, _, _, State) ->
+process_routing_confirm(false, _, _, _, _, State) ->
State;
-process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
+process_routing_confirm(true, [], _, MsgSeqNo, XName, State) ->
record_confirms([{MsgSeqNo, XName}], State);
-process_routing_confirm(true, QRefs, MsgSeqNo, XName, State) ->
- State#ch{unconfirmed = dtree:insert(MsgSeqNo, QRefs, XName,
- State#ch.unconfirmed)}.
-
-confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} = dtree:take(MsgSeqNos, QRef, UC),
+process_routing_confirm(true, QRefs, QNames, MsgSeqNo, XName, State) ->
+ State#ch{unconfirmed =
+ unconfirmed_messages:insert(MsgSeqNo, QNames, QRefs, XName,
+ State#ch.unconfirmed)}.
+
+confirm(MsgSeqNos, QRef, State = #ch{queue_names = QNames, unconfirmed = UC}) ->
+ %% NOTE: if queue name does not exist here it's likely that the ref also
+ %% does not exist in unconfirmed messages.
+ %% ignore value does not change anything.
+ QName = maps:get(QRef, QNames, ignore),
+ {MXs, UC1} =
+ unconfirmed_messages:confirm_multiple_msg_ref(MsgSeqNos, QName, QRef, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
record_confirms(MXs, State#ch{unconfirmed = UC1}).
@@ -2250,9 +2295,9 @@ send_confirms(Cs, Rs, State) ->
coalesce_and_send(MsgSeqNos, NegativeMsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- UnconfirmedCutoff = case dtree:is_empty(UC) of
+ UnconfirmedCutoff = case unconfirmed_messages:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
- false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo
+ false -> unconfirmed_messages:smallest(UC)
end,
Cutoff = lists:min([UnconfirmedCutoff | NegativeMsgSeqNos]),
{Ms, Ss} = lists:splitwith(fun(X) -> X < Cutoff end, SMsgSeqNos),
@@ -2271,7 +2316,7 @@ ack_len(Acks) -> lists:sum([length(L) || {ack, L} <- Acks]).
maybe_complete_tx(State = #ch{tx = {_, _}}) ->
State;
maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
- case dtree:is_empty(UC) of
+ case unconfirmed_messages:is_empty(UC) of
false -> State;
true -> complete_tx(State#ch{confirmed = []})
end.
@@ -2311,7 +2356,7 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(source, #ch{cfg = #conf{source = ChSrc}}) -> ChSrc;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) -> unconfirmed_messages:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> ?QUEUE:len(UAMQ);
i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
diff --git a/src/unconfirmed_messages.erl b/src/unconfirmed_messages.erl
new file mode 100644
index 0000000000..701ca115c2
--- /dev/null
+++ b/src/unconfirmed_messages.erl
@@ -0,0 +1,292 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+%% Unconfirmed messages tracking.
+
+%% A message should be confirmed only when all queues confirm
+
+%% Messages are published to multiple queues while each queue may be
+%% represented by several processes (queue refs).
+
+%% Queue refs return confirmations, rejections, may fail or disconnect.
+%% If a queue ref fails - the messgae should be rejected.
+%% If all queue refs for a queue disconnect (not fail) without confirmation -
+%% the messge should be rejected.
+
+%% For simplicity, disconnects do not return reject until all message refs
+%% confirm or disconnect.
+
+
+-module(unconfirmed_messages).
+
+
+-export([new/0,
+ insert/5,
+ confirm_msg_ref/4,
+ confirm_multiple_msg_ref/4,
+ forget_ref/2,
+
+ reject_msg/2,
+ reject_all_for_queue/2,
+
+ smallest/1,
+ size/1,
+ is_empty/1]).
+
+%%----------------------------------------------------------------------------
+
+-export_type([?MODULE/0]).
+-define(SET_VALUE, []).
+
+-type queue_ref() :: term().
+-type msg_id() :: term().
+-type queue_name() :: rabbit_amqqueue:name().
+-type exchange_name() :: rabbit_exchange:name().
+-type map_set(Type) :: #{Type => ?SET_VALUE}.
+
+
+%% refs is a set of refs waiting for confirm
+%% queue_status shows which queues had at least one confirmation
+
+-record(msg_status,
+ {refs = #{} :: map_set(queue_ref()),
+ queue_status = #{} :: #{queue_name() => confirmed | rejected},
+ exchange :: exchange_name()}).
+
+%% ordered set is needed to get unconfirmed cutoff
+%% index contains message statuses of all message IDs
+%% reverse index is needed to locate message IDs from queue refs
+-record(unconfirmed,
+ {ordered = gb_sets:new() :: gb_sets:set(msg_id()),
+ index = #{} :: #{msg_id() => #msg_status{}},
+ reverse = #{} :: #{queue_ref() => #{msg_id() => ?SET_VALUE}}}).
+
+-opaque ?MODULE() :: #unconfirmed{}.
+
+%%----------------------------------------------------------------------------
+
+-spec new() -> ?MODULE().
+new() -> #unconfirmed{}.
+
+%% Insert and entry. Fails if there already is an entry with the given
+%% message id.
+
+-spec insert(msg_id(), [queue_name()], [queue_ref()], exchange_name(), ?MODULE()) -> ?MODULE().
+insert(MsgId, QueueNames, QueueRefs, XName,
+ #unconfirmed{ordered = Ordered,
+ index = Index,
+ reverse = Reverse} = UC) ->
+
+ case maps:get(MsgId, Index, none) of
+ none ->
+ UC#unconfirmed{
+ ordered = gb_sets:add(MsgId, Ordered),
+ index =
+ Index#{MsgId =>
+ #msg_status{
+ refs = maps:from_list([{QR, ?SET_VALUE} || QR <- QueueRefs]),
+ queue_status = maps:from_list([{QN, rejected} || QN <- QueueNames]),
+ exchange = XName}},
+ reverse = lists:foldl(
+ fun
+ (Ref, R) ->
+ case R of
+ #{Ref := MsgIdsSet} ->
+ R#{Ref => MsgIdsSet#{MsgId => ?SET_VALUE}};
+ _ ->
+ R#{Ref => #{MsgId => ?SET_VALUE}}
+ end
+ end,
+ Reverse, QueueRefs)
+ };
+ _ ->
+ error({message_already_exists, MsgId, QueueNames, QueueRefs, XName, UC})
+ end.
+
+%% Standard confirmation.
+%% Removes the queue ref from waiting, if it was the last one -
+%% return confirmed and cleanup the message id state
+%% If the ref was not the last one - update queues status for the queue
+%% to confirmed
+
+-spec confirm_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
+ {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
+confirm_msg_ref(MsgId, QueueName, QueueRef,
+ #unconfirmed{reverse = Reverse} = UC) ->
+ remove_msg_ref(confirm, MsgId, QueueName, QueueRef,
+ UC#unconfirmed{reverse = remove_from_reverse(QueueRef, [MsgId], Reverse)}).
+
+-spec confirm_multiple_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
+ {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
+confirm_multiple_msg_ref(MsgIds, QueueName, QueueRef,
+ #unconfirmed{reverse = Reverse} = UC0) ->
+ lists:foldl(
+ fun(MsgId, {C, UC}) ->
+ case remove_msg_ref(confirm, MsgId, QueueName, QueueRef, UC) of
+ {{confirmed, V}, UC1} -> {[V | C], UC1};
+ {not_confirmed, UC1} -> {C, UC1}
+ end
+ end,
+ {[], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}},
+ MsgIds).
+
+%% Remove queue ref from waiting for all messages based on reverse index.
+%% If there are no more refs left for the message - return either
+%% confirmed or rejected.
+%% Confirmed is returned if all queues have queue status confirmed,
+%% which means that each queue has at least one ref (process) confirmed.
+%% Returns lists of confirmed and rejected messages.
+-spec forget_ref(queue_ref(), ?MODULE()) ->
+ {Confirmed :: [{msg_id(), exchange_name()}],
+ Rejected :: [{msg_id(), exchange_name()}],
+ ?MODULE()}.
+forget_ref(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) ->
+ MsgIds = maps:keys(maps:get(QueueRef, Reverse0, #{})),
+ lists:foldl(fun(MsgId, {C, R, UC}) ->
+ case remove_msg_ref(no_confirm, MsgId, ignore, QueueRef, UC) of
+ {not_confirmed, UC1} -> {C, R, UC1};
+ {{confirmed, V}, UC1} -> {[V | C], R, UC1};
+ {{rejected, V}, UC1} -> {C, [V | R], UC1}
+ end
+ end,
+ {[], [], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}},
+ MsgIds).
+
+%% Cleanup message id
+%% Return rejected if there was a message with
+%% such ID.
+-spec reject_msg(msg_id(), ?MODULE()) ->
+ {{rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
+reject_msg(MsgId, #unconfirmed{ordered = Ordered, index = Index, reverse = Reverse} = UC) ->
+ case maps:get(MsgId, Index, none) of
+ none ->
+ {not_confirmed, UC};
+ #msg_status{exchange = XName,
+ refs = Refs} ->
+ {{rejected, {MsgId, XName}},
+ UC#unconfirmed{ordered = gb_sets:del_element(MsgId, Ordered),
+ index = maps:remove(MsgId, Index),
+ reverse = remove_multiple_from_reverse(maps:keys(Refs), [MsgId], Reverse)}}
+ end.
+
+%% Cleanup message ids for all messages referencing ref
+%% Based on reverse index.
+%% Returns a list of rejected messages
+-spec reject_all_for_queue(queue_ref(), ?MODULE()) ->
+ {Rejected :: [{msg_id(), exchange_name()}], ?MODULE()}.
+reject_all_for_queue(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) ->
+ MsgIds = maps:keys(maps:get(QueueRef, Reverse0, #{})),
+ lists:foldl(fun(MsgId, {R, UC}) ->
+ case reject_msg(MsgId, UC) of
+ {not_confirmed, UC1} -> {R, UC1};
+ {{rejected, V}, UC1} -> {[V | R], UC1}
+ end
+ end,
+ {[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}},
+ MsgIds).
+
+%% Returns a smallest message id.
+-spec smallest(?MODULE()) -> msg_id().
+smallest(#unconfirmed{ordered = Ordered}) ->
+ gb_sets:smallest(Ordered).
+
+-spec size(?MODULE()) -> msg_id().
+size(#unconfirmed{index = Index}) -> maps:size(Index).
+
+-spec is_empty(?MODULE()) -> boolean().
+is_empty(#unconfirmed{index = Index, reverse = Reverse, ordered = Ordered} = UC) ->
+ case maps:size(Index) == 0 of
+ true ->
+ %% Assertion
+ case maps:size(Reverse) == gb_sets:size(Ordered)
+ andalso
+ maps:size(Reverse) == 0 of
+ true -> ok;
+ false -> error({size_mismatch, UC})
+ end,
+ true;
+ _ ->
+ false
+ end.
+
+-spec remove_from_reverse(queue_ref(), [msg_id()],
+ #{queue_ref() => #{msg_id() => ?SET_VALUE}}) ->
+ #{queue_ref() => #{msg_id() => ?SET_VALUE}}.
+remove_from_reverse(QueueRef, MsgIds, Reverse) when is_list(MsgIds) ->
+ case maps:get(QueueRef, Reverse, none) of
+ none ->
+ Reverse;
+ MsgIdsSet ->
+ NewMsgIdsSet = maps:without(MsgIds, MsgIdsSet),
+ case maps:size(NewMsgIdsSet) > 0 of
+ true -> Reverse#{QueueRef => NewMsgIdsSet};
+ false -> maps:remove(QueueRef, Reverse)
+ end
+ end.
+
+-spec remove_multiple_from_reverse([queue_ref()], [msg_id()],
+ #{queue_ref() => #{msg_id() => ?SET_VALUE}}) ->
+ #{queue_ref() => #{msg_id() => ?SET_VALUE}}.
+remove_multiple_from_reverse(Refs, MsgIds, Reverse0) ->
+ lists:foldl(
+ fun(Ref, Reverse) ->
+ remove_from_reverse(Ref, MsgIds, Reverse)
+ end,
+ Reverse0,
+ Refs).
+
+-spec remove_msg_ref(confirm | no_confirm, msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
+ {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed,
+ ?MODULE()}.
+remove_msg_ref(Confirm, MsgId, QueueName, QueueRef,
+ #unconfirmed{ordered = Ordered, index = Index} = UC) ->
+ case maps:get(MsgId, Index, none) of
+ none ->
+ {not_confirmed, UC};
+ #msg_status{refs = #{QueueRef := ?SET_VALUE} = Refs,
+ queue_status = QStatus,
+ exchange = XName} = MsgStatus ->
+ QStatus1 = case {Confirm, QueueName} of
+ {no_confirm, _} -> QStatus;
+ {_, ignore} -> QStatus;
+ {confirm, _} -> QStatus#{QueueName => confirmed}
+ end,
+ case maps:size(Refs) == 1 of
+ true ->
+ {{confirm_status(QStatus1), {MsgId, XName}},
+ UC#unconfirmed{
+ ordered = gb_sets:del_element(MsgId, Ordered),
+ index = maps:remove(MsgId, Index)}};
+ false ->
+ {not_confirmed,
+ UC#unconfirmed{
+ index = Index#{MsgId =>
+ MsgStatus#msg_status{
+ refs = maps:remove(QueueRef, Refs),
+ queue_status = QStatus1}}}}
+ end;
+ _ -> {not_confirmed, UC}
+ end.
+
+-spec confirm_status(#{queue_name() => confirmed | rejected}) -> confirmed | rejected.
+confirm_status(QueueStatus) ->
+ case lists:all(fun(confirmed) -> true; (_) -> false end,
+ maps:values(QueueStatus)) of
+ true -> confirmed;
+ false -> rejected
+ end.
+
+