summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl40
1 files changed, 19 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 36b8885f58..cc1e50f79d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -426,29 +426,27 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
confirm_messages(Guids, State) ->
- {CMs, State1} = annotate_confirms_with_channel(Guids, State),
- CMs1 = group_confirms_by_channel(CMs),
- [rabbit_channel:confirm(ChPid, Msgs) || {ChPid, Msgs} <- CMs1],
+ {CMs, State1} =
+ lists:foldl(fun(Guid, {CMs, State0 = #q{guid_to_channel = GTC0}}) ->
+ case dict:find(Guid, GTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {[{ChPid, MsgSeqNo} | CMs],
+ State0#q{guid_to_channel =
+ dict:erase(Guid, GTC0)}};
+ _ ->
+ {CMs, State0}
+ end
+ end, {[], State}, Guids),
+ case lists:usort(CMs) of
+ [{Ch, MsgSeqNo} | CMs1] ->
+ CMs2 = group_confirms_by_channel(CMs1, [{Ch, [MsgSeqNo]}]),
+ [rabbit_channel:confirm(ChPid, MsgSeqNos)
+ || {ChPid, MsgSeqNos} <- CMs2];
+ [] ->
+ ok
+ end,
State1.
-annotate_confirms_with_channel(Guids, State) ->
- lists:foldl(fun(Guid, {CMs, State0 = #q{guid_to_channel = GTC0}}) ->
- case dict:find(Guid, GTC0) of
- {ok, {ChPid, MsgSeqNo}} ->
- {[{ChPid, MsgSeqNo} | CMs],
- State0#q{guid_to_channel =
- dict:erase(Guid, GTC0)}};
- _ ->
- {CMs, State0}
- end
- end, {[], State}, Guids).
-
-group_confirms_by_channel([]) ->
- [];
-group_confirms_by_channel(CMs) ->
- [{Ch, Msg} | CMs1] = lists:usort(CMs),
- group_confirms_by_channel(CMs1, [{Ch, [Msg]}]).
-
group_confirms_by_channel([], Acc) ->
Acc;
group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) ->