summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2019-06-04 19:08:55 -0400
committerDaniil Fedotov <hairyhum@gmail.com>2019-06-04 19:10:38 -0400
commitdef400e81db176b348e8ffc2574e47d8585e7fb1 (patch)
treea0b5648cce9dcb901600a519e2ce54108fe8b89b
parent35377fc69906b7a507d0161d37f4b778dded97b8 (diff)
downloadrabbitmq-server-git-queue_confirm_refactor.tar.gz
Handle rejected queues on confirm.queue_confirm_refactor
When one queue confirms a message and another queue failed without confirming it, the message should be rejected. Fixes some races between confirms and rejects.
-rw-r--r--src/rabbit_amqqueue_process.erl17
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/unconfirmed_messages.erl41
3 files changed, 34 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index aa60503a70..2185d7c95f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -603,13 +603,22 @@ confirm_messages(MsgIds, MTC) ->
none ->
{CMs, MTC0};
{SenderPid, MsgSeqNo} ->
- {rabbit_misc:gb_trees_cons(SenderPid,
- MsgSeqNo, CMs),
+ {maps:update_with(SenderPid,
+ fun(MsgSeqNos) ->
+ [MsgSeqNo | MsgSeqNos]
+ end,
+ [MsgSeqNo],
+ CMs),
maps:remove(MsgId, MTC0)}
end
- end, {gb_trees:empty(), MTC}, MsgIds),
- rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
+ end, {#{}, MTC}, MsgIds),
+ maps:fold(
+ fun(Pid, MsgSeqNos, _) ->
+ rabbit_misc:confirm_to_sender(Pid, MsgSeqNos)
+ end,
+ ok,
+ CMs),
MTC1.
send_or_record_confirm(#delivery{confirm = false}, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c0f1ff5dc8..dd063dbff7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2233,10 +2233,11 @@ confirm(MsgSeqNos, QRef, State = #ch{queue_names = QNames, unconfirmed = UC}) ->
%% does not exist in unconfirmed messages.
%% Neither does the 'ignore' atom, so it's a reasonable fallback.
QName = maps:get(QRef, QNames, ignore),
- {MXs, UC1} =
+ {ConfirmMXs, RejectMXs, UC1} =
unconfirmed_messages:confirm_multiple_msg_ref(MsgSeqNos, QName, QRef, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
- record_confirms(MXs, State#ch{unconfirmed = UC1}).
+ State1 = record_confirms(ConfirmMXs, State#ch{unconfirmed = UC1}),
+ record_rejects(RejectMXs, State1).
send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) ->
State;
diff --git a/src/unconfirmed_messages.erl b/src/unconfirmed_messages.erl
index 6b4435800c..0a4b533448 100644
--- a/src/unconfirmed_messages.erl
+++ b/src/unconfirmed_messages.erl
@@ -33,7 +33,6 @@
-export([new/0,
insert/5,
- confirm_msg_ref/4,
confirm_multiple_msg_ref/4,
forget_ref/2,
@@ -112,27 +111,22 @@ insert(MsgId, QueueNames, QueueRefs, XName,
error({message_already_exists, MsgId, QueueNames, QueueRefs, XName, UC})
end.
-%% Confirms a message on behalf of the given queue. If it was the last queue (ref)
-%% on the waiting list, returns 'confirmed' and performs the necessary cleanup.
--spec confirm_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
- {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
-confirm_msg_ref(MsgId, QueueName, QueueRef,
- #unconfirmed{reverse = Reverse} = UC) ->
- remove_msg_ref(confirm, MsgId, QueueName, QueueRef,
- UC#unconfirmed{reverse = remove_from_reverse(QueueRef, [MsgId], Reverse)}).
-
+%% Confirms messages on behalf of the given queue. If it was the last queue (ref)
+%% on the waiting list, returns message id and excahnge name
+%% and performs the necessary cleanup.
-spec confirm_multiple_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
- {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
+ {[{msg_id(), exchange_name()}], [{msg_id(), exchange_name()}], ?MODULE()}.
confirm_multiple_msg_ref(MsgIds, QueueName, QueueRef,
#unconfirmed{reverse = Reverse} = UC0) ->
lists:foldl(
- fun(MsgId, {C, UC}) ->
+ fun(MsgId, {C, R, UC}) ->
case remove_msg_ref(confirm, MsgId, QueueName, QueueRef, UC) of
- {{confirmed, V}, UC1} -> {[V | C], UC1};
- {not_confirmed, UC1} -> {C, UC1}
+ {{confirmed, V}, UC1} -> {[V | C], R, UC1};
+ {{rejected, V}, UC1} -> {C, [V | R], UC1};
+ {not_confirmed, UC1} -> {C, R, UC1}
end
end,
- {[], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}},
+ {[], [], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}},
MsgIds).
%% Removes all messages for a queue.
@@ -179,14 +173,15 @@ reject_msg(MsgId, #unconfirmed{ordered = Ordered, index = Index, reverse = Rever
{Rejected :: [{msg_id(), exchange_name()}], ?MODULE()}.
reject_all_for_queue(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) ->
MsgIds = maps:keys(maps:get(QueueRef, Reverse0, #{})),
- lists:foldl(fun(MsgId, {R, UC}) ->
- case reject_msg(MsgId, UC) of
- {not_confirmed, UC1} -> {R, UC1};
- {{rejected, V}, UC1} -> {[V | R], UC1}
- end
- end,
- {[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}},
- MsgIds).
+ lists:foldl(
+ fun(MsgId, {R, UC}) ->
+ case reject_msg(MsgId, UC) of
+ {not_confirmed, UC1} -> {R, UC1};
+ {{rejected, V}, UC1} -> {[V | R], UC1}
+ end
+ end,
+ {[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}},
+ MsgIds).
%% Returns a smallest message id.
-spec smallest(?MODULE()) -> msg_id().