diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 40 |
1 files changed, 19 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 36b8885f58..cc1e50f79d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -426,29 +426,27 @@ deliver_from_queue_deliver(AckRequired, false, State) -> {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. confirm_messages(Guids, State) -> - {CMs, State1} = annotate_confirms_with_channel(Guids, State), - CMs1 = group_confirms_by_channel(CMs), - [rabbit_channel:confirm(ChPid, Msgs) || {ChPid, Msgs} <- CMs1], + {CMs, State1} = + lists:foldl(fun(Guid, {CMs, State0 = #q{guid_to_channel = GTC0}}) -> + case dict:find(Guid, GTC0) of + {ok, {ChPid, MsgSeqNo}} -> + {[{ChPid, MsgSeqNo} | CMs], + State0#q{guid_to_channel = + dict:erase(Guid, GTC0)}}; + _ -> + {CMs, State0} + end + end, {[], State}, Guids), + case lists:usort(CMs) of + [{Ch, MsgSeqNo} | CMs1] -> + CMs2 = group_confirms_by_channel(CMs1, [{Ch, [MsgSeqNo]}]), + [rabbit_channel:confirm(ChPid, MsgSeqNos) + || {ChPid, MsgSeqNos} <- CMs2]; + [] -> + ok + end, State1. -annotate_confirms_with_channel(Guids, State) -> - lists:foldl(fun(Guid, {CMs, State0 = #q{guid_to_channel = GTC0}}) -> - case dict:find(Guid, GTC0) of - {ok, {ChPid, MsgSeqNo}} -> - {[{ChPid, MsgSeqNo} | CMs], - State0#q{guid_to_channel = - dict:erase(Guid, GTC0)}}; - _ -> - {CMs, State0} - end - end, {[], State}, Guids). - -group_confirms_by_channel([]) -> - []; -group_confirms_by_channel(CMs) -> - [{Ch, Msg} | CMs1] = lists:usort(CMs), - group_confirms_by_channel(CMs1, [{Ch, [Msg]}]). - group_confirms_by_channel([], Acc) -> Acc; group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) -> |
