summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl38
1 files changed, 33 insertions, 5 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 09d1b385c7..dbfb11c0b8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -49,7 +49,7 @@
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
confirm}).
--record(confirm, {enabled, count, multiple, tref}).
+-record(confirm, {enabled, count, multiple, tref, held_acks}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -70,6 +70,8 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(MULTIPLE_ACK_FLUSH_INTERVAL, 5000).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -176,7 +178,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
confirm = #confirm{ enabled = false,
count = 0,
multiple = false,
- tref = not_started }},
+ tref = not_started,
+ held_acks = gb_sets:new()}},
rabbit_event:notify(
channel_created,
[{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
@@ -253,7 +256,20 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
erase_queue_stats(QPid),
- {noreply, queue_blocked(QPid, State)}.
+ {noreply, queue_blocked(QPid, State)};
+handle_info(multiple_ack_flush,
+ State = #ch{ writer_pid = WriterPid,
+ confirm = #confirm{held_acks = As} } ) ->
+ rabbit_log:info("channel got a multiple_ack_flush message~n"
+ "held acks: ~p~n", [gb_sets:to_list(As)]),
+ gb_sets:fold(fun(A, L) ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{ delivery_tag = A }),
+ L
+ end, -1, As),
+ {noreply, State#ch{confirm = #confirm{ held_acks = gb_sets:new()}}}.
+
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -400,9 +416,15 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
handle_confirm(C = #confirm{ enabled = false }, _, _) ->
C;
handle_confirm(C = #confirm{ count = Count, multiple = false }, false, WriterPid) ->
- rabbit_log:info("handling confirm in single mode (#~p)~n", [Count]),
+ rabbit_log:info("handling confirm in single transient mode (#~p)~n", [Count]),
ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{ delivery_tag = Count }),
C#confirm{ count = Count+1 };
+handle_confirm(C = #confirm{ count = Count,
+ multiple = true,
+ held_acks = As }, false, _WriterPid) ->
+ rabbit_log:info("handling confirm in multiple transient mode (#~p)~n", [Count]),
+ C#confirm{ count = Count+1,
+ held_acks = gb_sets:add(Count, As) };
handle_confirm(C = #confirm{ count = Count }, IsPersistent, _WriterPid) ->
rabbit_log:info("handling confirm (#~p, persistent = ~p)~n", [Count, IsPersistent]),
C#confirm{ count = Count+1 }.
@@ -883,8 +905,14 @@ handle_method(#'confirm.select'{multiple = Multiple,
confirm = C = #confirm{enabled = false}}) ->
rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n",
[Multiple, NoWait]),
+ TRef = case Multiple of
+ false -> not_started;
+ true -> timer:send_interval(?MULTIPLE_ACK_FLUSH_INTERVAL,
+ multiple_ack_flush)
+ end,
State1 = State#ch{confirm = C#confirm{ enabled = true,
- multiple = Multiple }},
+ multiple = Multiple,
+ tref = TRef }},
case NoWait of
true -> {noreply, State1};
false -> {reply, #'confirm.select_ok'{}, State1}