summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-12 11:04:13 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-12 11:04:13 +0100
commitc229634a282a604c998a6d1936d9b33579bdebab (patch)
treef95e1cdd1ee8788a13fe7432c1a367d733743f31
parent1cc1cf76c916a2f09a118912ecd13ceae8306547 (diff)
downloadrabbitmq-server-git-c229634a282a604c998a6d1936d9b33579bdebab.tar.gz
support for multiple confirmations
Suppose the channel is in confirm multiple mode, and acks for the following messages are outstanding: > 3 4 5 7 10 13 The server sends one confirm-up-to 5 ack and three more acks for 7, 10 and 13. If the client receives a basic.ack, it should interpret it like this: basic.ack{delivery-tag=N, multiple=false} => the published message with sequence number N has been handled basic.ack{delivery-tag=N, multiple=true} => all the published messages with sequence numbers up to and including N have been handled
-rw-r--r--src/rabbit_channel.erl33
1 files changed, 27 insertions, 6 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dbfb11c0b8..b5c5d8a784 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -262,14 +262,35 @@ handle_info(multiple_ack_flush,
confirm = #confirm{held_acks = As} } ) ->
rabbit_log:info("channel got a multiple_ack_flush message~n"
"held acks: ~p~n", [gb_sets:to_list(As)]),
- gb_sets:fold(fun(A, L) ->
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{ delivery_tag = A }),
- L
- end, -1, As),
+ case gb_sets:is_empty(As) of
+ true -> ok;
+ false -> flush_multiple(As, WriterPid)
+ end,
{noreply, State#ch{confirm = #confirm{ held_acks = gb_sets:new()}}}.
+flush_multiple(Acks, WriterPid) ->
+ [First | Rest] = gb_sets:to_list(Acks),
+ flush_multiple(First, Rest, WriterPid).
+
+flush_multiple(Prev, [Cur | Rest], WriterPid) ->
+ ExpNext = Prev+1,
+ case Cur of
+ ExpNext ->
+ flush_multiple(Cur, Rest, WriterPid);
+ _ ->
+ flush_multiple(Prev, [], WriterPid),
+ lists:foreach(fun(A) ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = A})
+ end, [Cur | Rest])
+ end;
+flush_multiple(Prev, [], WriterPid) ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{ delivery_tag = Prev,
+ multiple = true }).
+
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),