diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-12 11:04:13 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-12 11:04:13 +0100 |
| commit | c229634a282a604c998a6d1936d9b33579bdebab (patch) | |
| tree | f95e1cdd1ee8788a13fe7432c1a367d733743f31 | |
| parent | 1cc1cf76c916a2f09a118912ecd13ceae8306547 (diff) | |
| download | rabbitmq-server-git-c229634a282a604c998a6d1936d9b33579bdebab.tar.gz | |
support for multiple confirmations
Suppose the channel is in confirm multiple mode, and acks for the
following messages are outstanding:
> 3 4 5 7 10 13
The server sends one confirm-up-to 5 ack and three more acks for 7, 10
and 13.
If the client receives a basic.ack, it should interpret it like this:
basic.ack{delivery-tag=N, multiple=false} => the published message
with sequence number N has been handled
basic.ack{delivery-tag=N, multiple=true} => all the published
messages with sequence numbers up to and including N have been
handled
| -rw-r--r-- | src/rabbit_channel.erl | 33 |
1 files changed, 27 insertions, 6 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dbfb11c0b8..b5c5d8a784 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -262,14 +262,35 @@ handle_info(multiple_ack_flush, confirm = #confirm{held_acks = As} } ) -> rabbit_log:info("channel got a multiple_ack_flush message~n" "held acks: ~p~n", [gb_sets:to_list(As)]), - gb_sets:fold(fun(A, L) -> - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{ delivery_tag = A }), - L - end, -1, As), + case gb_sets:is_empty(As) of + true -> ok; + false -> flush_multiple(As, WriterPid) + end, {noreply, State#ch{confirm = #confirm{ held_acks = gb_sets:new()}}}. +flush_multiple(Acks, WriterPid) -> + [First | Rest] = gb_sets:to_list(Acks), + flush_multiple(First, Rest, WriterPid). + +flush_multiple(Prev, [Cur | Rest], WriterPid) -> + ExpNext = Prev+1, + case Cur of + ExpNext -> + flush_multiple(Cur, Rest, WriterPid); + _ -> + flush_multiple(Prev, [], WriterPid), + lists:foreach(fun(A) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = A}) + end, [Cur | Rest]) + end; +flush_multiple(Prev, [], WriterPid) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{ delivery_tag = Prev, + multiple = true }). + handle_pre_hibernate(State) -> ok = clear_permission_cache(), |
