summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl93
1 files changed, 64 insertions, 29 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 21d4ff2a4c..0571e45449 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -49,7 +49,7 @@
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
confirm}).
--record(confirm, {enabled, count, multiple, tref, held_acks}).
+-record(confirm, {enabled, count, multiple, tref, held_acks, need_acking}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -187,7 +187,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
confirm = #confirm{enabled = false,
count = 0,
multiple = false,
- held_acks = gb_sets:new()}},
+ held_acks = gb_sets:new(),
+ need_acking = gb_sets:new()}},
rabbit_event:notify(
channel_created,
[{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
@@ -258,12 +259,13 @@ handle_cast(emit_stats, State) ->
handle_cast(multiple_ack_flush,
State = #ch{writer_pid = WriterPid,
- confirm = C = #confirm{held_acks = As}}) ->
+ confirm = C = #confirm{held_acks = As,
+ need_acking = NA}}) ->
rabbit_log:info("channel got a multiple_ack_flush message~n"
"held acks: ~p~n", [gb_sets:to_list(As)]),
case gb_sets:is_empty(As) of
true -> ok; % this should never be the case
- false -> flush_multiple(As, WriterPid)
+ false -> flush_multiple(As, WriterPid, gb_sets:smallest(NA))
end,
{noreply, State#ch{confirm = C#confirm{held_acks = gb_sets:new(),
tref = undefined}}};
@@ -428,15 +430,37 @@ send_or_enqueue_ack(undefined, State) ->
send_or_enqueue_ack(_, State = #ch{confirm = #confirm{enabled = false}}) ->
State;
send_or_enqueue_ack(MsgSeqNo,
- State = #ch{writer_pid = WriterPid,
- confirm = #confirm{multiple = false}}) ->
+ State = #ch{confirm = #confirm{multiple = false}}) ->
rabbit_log:info("handling confirm in single mode (#~p)~n", [MsgSeqNo]),
- ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{delivery_tag = MsgSeqNo}),
- State;
+ do_if_not_dup(MsgSeqNo, State,
+ fun(MSN, S = #ch{writer_pid = WriterPid}) ->
+ rabbit_log:info("confirm #~p is not a dup!~n", [MSN]),
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = MSN}),
+ S
+ end);
send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm = #confirm{multiple = true}}) ->
rabbit_log:info("handling confirm in multiple mode (#~p)~n", [MsgSeqNo]),
- State1 = #ch{confirm = C = #confirm{held_acks = As}} = start_ack_timer(State),
- State1#ch{confirm = C#confirm{held_acks = gb_sets:add(MsgSeqNo, As)}}.
+ do_if_not_dup(MsgSeqNo, State,
+ fun(MSN, S) ->
+ rabbit_log:info("confirm #~p is not a dup!~n", [MSN]),
+ State1
+ = #ch{confirm = C = #confirm{held_acks = As}}
+ = start_ack_timer(S),
+ State1#ch{confirm =
+ C#confirm{held_acks =
+ gb_sets:add(MSN, As)}}
+ end).
+
+do_if_not_dup(MsgSeqNo, State = #ch{confirm = #confirm{need_acking = NA}}, Fun) ->
+ case gb_sets:is_element(MsgSeqNo, NA) of
+ true ->
+ State1 = #ch{confirm = C} = Fun(MsgSeqNo, State),
+ State1#ch{confirm = C#confirm{need_acking = gb_sets:delete(MsgSeqNo, NA)}};
+ false ->
+ State
+ end.
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -477,16 +501,22 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
{undefined, State};
true ->
Count = State#ch.confirm#confirm.count,
+
+ Confirm = State#ch.confirm,
+ Confirm1 = Confirm#confirm{
+ need_acking = gb_sets:add(Count,
+ Confirm#confirm.need_acking)},
+
{CountOrUndefined, NewState} =
case IsPersistent of
- true -> {Count, State};
+ true -> {Count, State#ch{confirm = Confirm1}};
false -> {undefined,
send_or_enqueue_ack(
- Count, State)}
+ Count, State#ch{confirm = Confirm1})}
end,
- Confirm = NewState#ch.confirm,
+ Confirm2 = NewState#ch.confirm,
{CountOrUndefined,
- NewState#ch{confirm = Confirm#confirm{count = Count+1}}}
+ NewState#ch{confirm = Confirm2#confirm{count = Count+1}}}
end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -1297,25 +1327,30 @@ stop_ack_timer(State = #ch{confirm = C = #confirm{tref = TRef}}) ->
{ok, cancel} = timer:cancel(TRef),
State#ch{confirm = C#confirm{tref = undefined}}.
-flush_multiple(Acks, WriterPid) ->
+flush_multiple(Acks, WriterPid, SmallestNotAcked) ->
[First | Rest] = gb_sets:to_list(Acks),
- flush_multiple(First, Rest, WriterPid).
+ Remaining = case Rest of
+ [] -> [First];
+ _ -> flush_multiple(First, Rest, WriterPid, SmallestNotAcked)
+ end,
+ lists:foreach(fun(A) ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = A})
+ end, Remaining).
-flush_multiple(Prev, [Cur | Rest], WriterPid) ->
+flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) ->
ExpNext = Prev+1,
- case Cur of
- ExpNext ->
- flush_multiple(Cur, Rest, WriterPid);
+ case {SNA >= Cur, Cur} of
+ {true, ExpNext} ->
+ flush_multiple(Cur, Rest, WriterPid, SNA);
_ ->
- flush_multiple(Prev, [], WriterPid),
- lists:foreach(fun(A) ->
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = A})
- end, [Cur | Rest])
+ flush_multiple(Prev, [], WriterPid, SNA),
+ [Cur | Rest]
end;
-flush_multiple(Prev, [], WriterPid) ->
+flush_multiple(Prev, [], WriterPid, _) ->
ok = rabbit_writer:send_command(
WriterPid,
- #'basic.ack'{ delivery_tag = Prev,
- multiple = true }).
+ #'basic.ack'{delivery_tag = Prev,
+ multiple = true}),
+ [].