diff options
| -rw-r--r-- | src/rabbit_channel.erl | 33 |
1 files changed, 27 insertions, 6 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dbfb11c0b8..b5c5d8a784 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -262,14 +262,35 @@ handle_info(multiple_ack_flush, confirm = #confirm{held_acks = As} } ) -> rabbit_log:info("channel got a multiple_ack_flush message~n" "held acks: ~p~n", [gb_sets:to_list(As)]), - gb_sets:fold(fun(A, L) -> - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{ delivery_tag = A }), - L - end, -1, As), + case gb_sets:is_empty(As) of + true -> ok; + false -> flush_multiple(As, WriterPid) + end, {noreply, State#ch{confirm = #confirm{ held_acks = gb_sets:new()}}}. +flush_multiple(Acks, WriterPid) -> + [First | Rest] = gb_sets:to_list(Acks), + flush_multiple(First, Rest, WriterPid). + +flush_multiple(Prev, [Cur | Rest], WriterPid) -> + ExpNext = Prev+1, + case Cur of + ExpNext -> + flush_multiple(Cur, Rest, WriterPid); + _ -> + flush_multiple(Prev, [], WriterPid), + lists:foreach(fun(A) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = A}) + end, [Cur | Rest]) + end; +flush_multiple(Prev, [], WriterPid) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{ delivery_tag = Prev, + multiple = true }). + handle_pre_hibernate(State) -> ok = clear_permission_cache(), |
