summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl33
1 files changed, 27 insertions, 6 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dbfb11c0b8..b5c5d8a784 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -262,14 +262,35 @@ handle_info(multiple_ack_flush,
confirm = #confirm{held_acks = As} } ) ->
rabbit_log:info("channel got a multiple_ack_flush message~n"
"held acks: ~p~n", [gb_sets:to_list(As)]),
- gb_sets:fold(fun(A, L) ->
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{ delivery_tag = A }),
- L
- end, -1, As),
+ case gb_sets:is_empty(As) of
+ true -> ok;
+ false -> flush_multiple(As, WriterPid)
+ end,
{noreply, State#ch{confirm = #confirm{ held_acks = gb_sets:new()}}}.
+flush_multiple(Acks, WriterPid) ->
+ [First | Rest] = gb_sets:to_list(Acks),
+ flush_multiple(First, Rest, WriterPid).
+
+flush_multiple(Prev, [Cur | Rest], WriterPid) ->
+ ExpNext = Prev+1,
+ case Cur of
+ ExpNext ->
+ flush_multiple(Cur, Rest, WriterPid);
+ _ ->
+ flush_multiple(Prev, [], WriterPid),
+ lists:foreach(fun(A) ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = A})
+ end, [Cur | Rest])
+ end;
+flush_multiple(Prev, [], WriterPid) ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{ delivery_tag = Prev,
+ multiple = true }).
+
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),