diff options
| -rw-r--r-- | src/rabbit_channel.erl | 161 |
1 files changed, 86 insertions, 75 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index fe2cbbda51..bebfc9b736 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,7 @@ -export([start_link/6, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1, flush/1]). +-export([emit_stats/1, flush/1, flush_multiple_acks/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). @@ -152,6 +152,10 @@ emit_stats(Pid) -> flush(Pid) -> gen_server2:call(Pid, flush). +flush_multiple_acks(Pid) -> + gen_server2:cast(Pid, multiple_ack_flush). + + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> @@ -175,11 +179,10 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> blocking = dict:new(), queue_collector_pid = CollectorPid, stats_timer = rabbit_event:init_stats_timer(), - confirm = #confirm{ enabled = false, - count = 0, - multiple = false, - tref = not_started, - held_acks = gb_sets:new()}}, + confirm = #confirm{enabled = false, + count = 0, + multiple = false, + held_acks = gb_sets:new()}}, rabbit_event:notify( channel_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), @@ -246,7 +249,19 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, handle_cast(emit_stats, State) -> internal_emit_stats(State), - {noreply, State}. + {noreply, State}; + +handle_cast(multiple_ack_flush, + State = #ch{writer_pid = WriterPid, + confirm = C = #confirm{held_acks = As}}) -> + rabbit_log:info("channel got a multiple_ack_flush message~n" + "held acks: ~p~n", [gb_sets:to_list(As)]), + case gb_sets:is_empty(As) of + true -> ok; + false -> flush_multiple(As, WriterPid) + end, + {noreply, State#ch{confirm = C#confirm{held_acks = gb_sets:new(), + tref = undefined}}}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -256,41 +271,7 @@ handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State)}; -handle_info(multiple_ack_flush, - State = #ch{ writer_pid = WriterPid, - confirm = C = #confirm{held_acks = As} } ) -> - rabbit_log:info("channel got a multiple_ack_flush message~n" - "held acks: ~p~n", [gb_sets:to_list(As)]), - case gb_sets:is_empty(As) of - true -> ok; - false -> flush_multiple(As, WriterPid) - end, - {noreply, State#ch{confirm = C#confirm{ held_acks = gb_sets:new()}}}. - -flush_multiple(Acks, WriterPid) -> - [First | Rest] = gb_sets:to_list(Acks), - flush_multiple(First, Rest, WriterPid). - -flush_multiple(Prev, [Cur | Rest], WriterPid) -> - ExpNext = Prev+1, - case Cur of - ExpNext -> - flush_multiple(Cur, Rest, WriterPid); - _ -> - flush_multiple(Prev, [], WriterPid), - lists:foreach(fun(A) -> - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = A}) - end, [Cur | Rest]) - end; -flush_multiple(Prev, [], WriterPid) -> - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{ delivery_tag = Prev, - multiple = true }). - + {noreply, queue_blocked(QPid, State)}. handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -434,21 +415,24 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -handle_confirm(C = #confirm{ enabled = false }, _, _) -> - C; -handle_confirm(C = #confirm{ count = Count, multiple = false }, false, WriterPid) -> +handle_confirm(State = #ch{confirm = #confirm{enabled = false}}, _) -> + State; +handle_confirm(State = #ch{writer_pid = WriterPid, + confirm = C = #confirm{ count = Count, multiple = false}}, + false) -> rabbit_log:info("handling confirm in single transient mode (#~p)~n", [Count]), ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{ delivery_tag = Count }), - C#confirm{ count = Count+1 }; -handle_confirm(C = #confirm{ count = Count, - multiple = true, - held_acks = As }, false, _WriterPid) -> + State#ch{confirm = C#confirm{ count = Count+1 }}; +handle_confirm(State = #ch{confirm = C = #confirm{count = Count, + multiple = true, + held_acks = As}}, false) -> rabbit_log:info("handling confirm in multiple transient mode (#~p)~n", [Count]), - C#confirm{ count = Count+1, - held_acks = gb_sets:add(Count, As) }; -handle_confirm(C = #confirm{ count = Count }, IsPersistent, _WriterPid) -> + State1 = start_ack_timer(State), + State1#ch{confirm = C#confirm{count = Count+1, + held_acks = gb_sets:add(Count, As)}}; +handle_confirm(State = #ch{confirm = C = #confirm{count = Count}}, IsPersistent) -> rabbit_log:info("handling confirm (#~p, persistent = ~p)~n", [Count, IsPersistent]), - C#confirm{ count = Count+1 }. + State#ch{confirm = C#confirm{count = Count+1}}. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -474,8 +458,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, transaction_id = TxnKey, - writer_pid = WriterPid, - confirm = Confirm}) -> + writer_pid = WriterPid}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -483,7 +466,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), - Confirm1 = handle_confirm(Confirm, IsPersistent, WriterPid), + State1 = handle_confirm(State, IsPersistent), Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, @@ -500,10 +483,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish, State), + QPid <- DeliveredQPids]], publish, State1), {noreply, case TxnKey of - none -> State#ch{confirm = Confirm1}; - _ -> add_tx_participants(DeliveredQPids, State) + none -> State1; + _ -> add_tx_participants(DeliveredQPids, State1) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, @@ -926,14 +909,8 @@ handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, State = #ch{confirm = C = #confirm{enabled = false}}) -> rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n", [Multiple, NoWait]), - {ok, TRef} = case Multiple of - false -> {ok, not_started}; - true -> timer:send_interval(?MULTIPLE_ACK_FLUSH_INTERVAL, - multiple_ack_flush) - end, - State1 = State#ch{confirm = C#confirm{ enabled = true, - multiple = Multiple, - tref = TRef }}, + State1 = State#ch{confirm = C#confirm{enabled = true, + multiple = Multiple}}, case NoWait of true -> {noreply, State1}; false -> {reply, #'confirm.select_ok'{}, State1} @@ -1195,14 +1172,8 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, false -> rabbit_writer:send_command(WriterPid, M, Content) end. -terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid, - confirm = #confirm {tref = TRef}}) -> - case TRef of - not_started -> ok; - _ -> - rabbit_log:info("Canceling multiple ack timer: ~p~n", [TRef]), - {ok, cancel} = timer:cancel(TRef) - end, +terminate(State = #ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> + stop_ack_timer(State), pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]), rabbit_writer:shutdown(WriterPid), @@ -1284,3 +1255,43 @@ erase_queue_stats(QPid) -> erase({queue_stats, QPid}), [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. + +start_ack_timer(State = #ch{confirm = C = #confirm{tref = undefined}}) -> + rabbit_log:info("starting ack timer...~n"), + {ok, TRef} = timer:apply_after(?MULTIPLE_ACK_FLUSH_INTERVAL, + ?MODULE, flush_multiple_acks, [self()]), + State#ch{confirm = C#confirm{tref = TRef}}; +start_ack_timer(State) -> + rabbit_log:info("timer already started.. nop~n"), + State. + +stop_ack_timer(State = #ch{confirm = #confirm{tref = undefined}}) -> + rabbit_log:info("stopping a stopped ack timer.. nop~n"), + State; +stop_ack_timer(State = #ch{confirm = C = #confirm{tref = TRef}}) -> + rabbit_log:info("canceling ack timer: ~p~n", [TRef]), + {ok, cancel} = timer:cancel(TRef), + State#ch{confirm = C#confirm{tref = undefined}}. + +flush_multiple(Acks, WriterPid) -> + [First | Rest] = gb_sets:to_list(Acks), + flush_multiple(First, Rest, WriterPid). + +flush_multiple(Prev, [Cur | Rest], WriterPid) -> + ExpNext = Prev+1, + case Cur of + ExpNext -> + flush_multiple(Cur, Rest, WriterPid); + _ -> + flush_multiple(Prev, [], WriterPid), + lists:foreach(fun(A) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = A}) + end, [Cur | Rest]) + end; +flush_multiple(Prev, [], WriterPid) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{ delivery_tag = Prev, + multiple = true }). |
