diff options
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 17 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
-rw-r--r-- | src/unconfirmed_messages.erl | 41 |
3 files changed, 34 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index aa60503a70..2185d7c95f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -603,13 +603,22 @@ confirm_messages(MsgIds, MTC) -> none -> {CMs, MTC0}; {SenderPid, MsgSeqNo} -> - {rabbit_misc:gb_trees_cons(SenderPid, - MsgSeqNo, CMs), + {maps:update_with(SenderPid, + fun(MsgSeqNos) -> + [MsgSeqNo | MsgSeqNos] + end, + [MsgSeqNo], + CMs), maps:remove(MsgId, MTC0)} end - end, {gb_trees:empty(), MTC}, MsgIds), - rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), + end, {#{}, MTC}, MsgIds), + maps:fold( + fun(Pid, MsgSeqNos, _) -> + rabbit_misc:confirm_to_sender(Pid, MsgSeqNos) + end, + ok, + CMs), MTC1. send_or_record_confirm(#delivery{confirm = false}, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c0f1ff5dc8..dd063dbff7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2233,10 +2233,11 @@ confirm(MsgSeqNos, QRef, State = #ch{queue_names = QNames, unconfirmed = UC}) -> %% does not exist in unconfirmed messages. %% Neither does the 'ignore' atom, so it's a reasonable fallback. QName = maps:get(QRef, QNames, ignore), - {MXs, UC1} = + {ConfirmMXs, RejectMXs, UC1} = unconfirmed_messages:confirm_multiple_msg_ref(MsgSeqNos, QName, QRef, UC), %% NB: don't call noreply/1 since we don't want to send confirms. - record_confirms(MXs, State#ch{unconfirmed = UC1}). + State1 = record_confirms(ConfirmMXs, State#ch{unconfirmed = UC1}), + record_rejects(RejectMXs, State1). send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) -> State; diff --git a/src/unconfirmed_messages.erl b/src/unconfirmed_messages.erl index 6b4435800c..0a4b533448 100644 --- a/src/unconfirmed_messages.erl +++ b/src/unconfirmed_messages.erl @@ -33,7 +33,6 @@ -export([new/0, insert/5, - confirm_msg_ref/4, confirm_multiple_msg_ref/4, forget_ref/2, @@ -112,27 +111,22 @@ insert(MsgId, QueueNames, QueueRefs, XName, error({message_already_exists, MsgId, QueueNames, QueueRefs, XName, UC}) end. -%% Confirms a message on behalf of the given queue. If it was the last queue (ref) -%% on the waiting list, returns 'confirmed' and performs the necessary cleanup. --spec confirm_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) -> - {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}. -confirm_msg_ref(MsgId, QueueName, QueueRef, - #unconfirmed{reverse = Reverse} = UC) -> - remove_msg_ref(confirm, MsgId, QueueName, QueueRef, - UC#unconfirmed{reverse = remove_from_reverse(QueueRef, [MsgId], Reverse)}). - +%% Confirms messages on behalf of the given queue. If it was the last queue (ref) +%% on the waiting list, returns message id and excahnge name +%% and performs the necessary cleanup. -spec confirm_multiple_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) -> - {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}. + {[{msg_id(), exchange_name()}], [{msg_id(), exchange_name()}], ?MODULE()}. confirm_multiple_msg_ref(MsgIds, QueueName, QueueRef, #unconfirmed{reverse = Reverse} = UC0) -> lists:foldl( - fun(MsgId, {C, UC}) -> + fun(MsgId, {C, R, UC}) -> case remove_msg_ref(confirm, MsgId, QueueName, QueueRef, UC) of - {{confirmed, V}, UC1} -> {[V | C], UC1}; - {not_confirmed, UC1} -> {C, UC1} + {{confirmed, V}, UC1} -> {[V | C], R, UC1}; + {{rejected, V}, UC1} -> {C, [V | R], UC1}; + {not_confirmed, UC1} -> {C, R, UC1} end end, - {[], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}}, + {[], [], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}}, MsgIds). %% Removes all messages for a queue. @@ -179,14 +173,15 @@ reject_msg(MsgId, #unconfirmed{ordered = Ordered, index = Index, reverse = Rever {Rejected :: [{msg_id(), exchange_name()}], ?MODULE()}. reject_all_for_queue(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) -> MsgIds = maps:keys(maps:get(QueueRef, Reverse0, #{})), - lists:foldl(fun(MsgId, {R, UC}) -> - case reject_msg(MsgId, UC) of - {not_confirmed, UC1} -> {R, UC1}; - {{rejected, V}, UC1} -> {[V | R], UC1} - end - end, - {[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}}, - MsgIds). + lists:foldl( + fun(MsgId, {R, UC}) -> + case reject_msg(MsgId, UC) of + {not_confirmed, UC1} -> {R, UC1}; + {{rejected, V}, UC1} -> {[V | R], UC1} + end + end, + {[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}}, + MsgIds). %% Returns a smallest message id. -spec smallest(?MODULE()) -> msg_id(). |