diff options
| -rw-r--r-- | src/rabbit_channel.erl | 93 |
1 files changed, 64 insertions, 29 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 21d4ff2a4c..0571e45449 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,7 +49,7 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, confirm}). --record(confirm, {enabled, count, multiple, tref, held_acks}). +-record(confirm, {enabled, count, multiple, tref, held_acks, need_acking}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -187,7 +187,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> confirm = #confirm{enabled = false, count = 0, multiple = false, - held_acks = gb_sets:new()}}, + held_acks = gb_sets:new(), + need_acking = gb_sets:new()}}, rabbit_event:notify( channel_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), @@ -258,12 +259,13 @@ handle_cast(emit_stats, State) -> handle_cast(multiple_ack_flush, State = #ch{writer_pid = WriterPid, - confirm = C = #confirm{held_acks = As}}) -> + confirm = C = #confirm{held_acks = As, + need_acking = NA}}) -> rabbit_log:info("channel got a multiple_ack_flush message~n" "held acks: ~p~n", [gb_sets:to_list(As)]), case gb_sets:is_empty(As) of true -> ok; % this should never be the case - false -> flush_multiple(As, WriterPid) + false -> flush_multiple(As, WriterPid, gb_sets:smallest(NA)) end, {noreply, State#ch{confirm = C#confirm{held_acks = gb_sets:new(), tref = undefined}}}; @@ -428,15 +430,37 @@ send_or_enqueue_ack(undefined, State) -> send_or_enqueue_ack(_, State = #ch{confirm = #confirm{enabled = false}}) -> State; send_or_enqueue_ack(MsgSeqNo, - State = #ch{writer_pid = WriterPid, - confirm = #confirm{multiple = false}}) -> + State = #ch{confirm = #confirm{multiple = false}}) -> rabbit_log:info("handling confirm in single mode (#~p)~n", [MsgSeqNo]), - ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{delivery_tag = MsgSeqNo}), - State; + do_if_not_dup(MsgSeqNo, State, + fun(MSN, S = #ch{writer_pid = WriterPid}) -> + rabbit_log:info("confirm #~p is not a dup!~n", [MSN]), + ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = MSN}), + S + end); send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm = #confirm{multiple = true}}) -> rabbit_log:info("handling confirm in multiple mode (#~p)~n", [MsgSeqNo]), - State1 = #ch{confirm = C = #confirm{held_acks = As}} = start_ack_timer(State), - State1#ch{confirm = C#confirm{held_acks = gb_sets:add(MsgSeqNo, As)}}. + do_if_not_dup(MsgSeqNo, State, + fun(MSN, S) -> + rabbit_log:info("confirm #~p is not a dup!~n", [MSN]), + State1 + = #ch{confirm = C = #confirm{held_acks = As}} + = start_ack_timer(S), + State1#ch{confirm = + C#confirm{held_acks = + gb_sets:add(MSN, As)}} + end). + +do_if_not_dup(MsgSeqNo, State = #ch{confirm = #confirm{need_acking = NA}}, Fun) -> + case gb_sets:is_element(MsgSeqNo, NA) of + true -> + State1 = #ch{confirm = C} = Fun(MsgSeqNo, State), + State1#ch{confirm = C#confirm{need_acking = gb_sets:delete(MsgSeqNo, NA)}}; + false -> + State + end. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -477,16 +501,22 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {undefined, State}; true -> Count = State#ch.confirm#confirm.count, + + Confirm = State#ch.confirm, + Confirm1 = Confirm#confirm{ + need_acking = gb_sets:add(Count, + Confirm#confirm.need_acking)}, + {CountOrUndefined, NewState} = case IsPersistent of - true -> {Count, State}; + true -> {Count, State#ch{confirm = Confirm1}}; false -> {undefined, send_or_enqueue_ack( - Count, State)} + Count, State#ch{confirm = Confirm1})} end, - Confirm = NewState#ch.confirm, + Confirm2 = NewState#ch.confirm, {CountOrUndefined, - NewState#ch{confirm = Confirm#confirm{count = Count+1}}} + NewState#ch{confirm = Confirm2#confirm{count = Count+1}}} end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -1297,25 +1327,30 @@ stop_ack_timer(State = #ch{confirm = C = #confirm{tref = TRef}}) -> {ok, cancel} = timer:cancel(TRef), State#ch{confirm = C#confirm{tref = undefined}}. -flush_multiple(Acks, WriterPid) -> +flush_multiple(Acks, WriterPid, SmallestNotAcked) -> [First | Rest] = gb_sets:to_list(Acks), - flush_multiple(First, Rest, WriterPid). + Remaining = case Rest of + [] -> [First]; + _ -> flush_multiple(First, Rest, WriterPid, SmallestNotAcked) + end, + lists:foreach(fun(A) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = A}) + end, Remaining). -flush_multiple(Prev, [Cur | Rest], WriterPid) -> +flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) -> ExpNext = Prev+1, - case Cur of - ExpNext -> - flush_multiple(Cur, Rest, WriterPid); + case {SNA >= Cur, Cur} of + {true, ExpNext} -> + flush_multiple(Cur, Rest, WriterPid, SNA); _ -> - flush_multiple(Prev, [], WriterPid), - lists:foreach(fun(A) -> - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = A}) - end, [Cur | Rest]) + flush_multiple(Prev, [], WriterPid, SNA), + [Cur | Rest] end; -flush_multiple(Prev, [], WriterPid) -> +flush_multiple(Prev, [], WriterPid, _) -> ok = rabbit_writer:send_command( WriterPid, - #'basic.ack'{ delivery_tag = Prev, - multiple = true }). + #'basic.ack'{delivery_tag = Prev, + multiple = true}), + []. |
