diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 42 |
1 files changed, 21 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a1db2ccfd6..8df8a97476 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,7 @@ -export([start_link/7, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]). +-export([emit_stats/1, flush/1, flush_confirms/1, confirm/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -72,7 +72,7 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). --define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000). +-define(FLUSH_CONFIRMS_INTERVAL, 1000). %%---------------------------------------------------------------------------- @@ -103,7 +103,7 @@ -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). --spec(flush_multiple_acks/1 :: (pid()) -> 'ok'). +-spec(flush_confirms/1 :: (pid()) -> 'ok'). -spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). -endif. @@ -159,8 +159,8 @@ emit_stats(Pid) -> flush(Pid) -> gen_server2:call(Pid, flush). -flush_multiple_acks(Pid) -> - gen_server2:cast(Pid, flush_multiple_acks). +flush_confirms(Pid) -> + gen_server2:cast(Pid, flush_confirms). confirm(Pid, MsgSeqNo) -> gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). @@ -291,11 +291,11 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, hibernate}; -handle_cast(flush_multiple_acks, State) -> - {noreply, flush_multiple(State)}; +handle_cast(flush_confirms, State) -> + {noreply, internal_flush_confirms(State)}; handle_cast({confirm, MsgSeqNo, From}, State) -> - {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}. + {noreply, confirm(MsgSeqNo, From, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{queues_for_msg = QFM}) -> @@ -303,7 +303,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> Qs = sets:del_element(QPid, QPids), case sets:size(Qs) of - 0 -> send_or_enqueue_ack(Msg, QPid, State0); + 0 -> confirm(Msg, QPid, State0); _ -> State0#ch{queues_for_msg = dict:store(Msg, Qs, QFM0)} end @@ -313,7 +313,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - State1 = flush_multiple(State), + State1 = internal_flush_confirms(State), rabbit_event:if_enabled(StatsTimer, fun () -> internal_emit_stats( @@ -465,11 +465,11 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -send_or_enqueue_ack(undefined, _QPid, State) -> +confirm(undefined, _QPid, State) -> State; -send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> +confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> State; -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> +confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> do_if_unconfirmed(MsgSeqNo, QPid, fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command( @@ -477,7 +477,7 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> delivery_tag = MSN}), State1 end, State); -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> +confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> do_if_unconfirmed(MsgSeqNo, QPid, fun(MSN, State1 = #ch{held_confirms = As}) -> start_confirm_timer( @@ -1231,12 +1231,12 @@ is_message_persistent(Content) -> process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), - send_or_enqueue_ack(MsgSeqNo, undefined, State); + confirm(MsgSeqNo, undefined, State); process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), - send_or_enqueue_ack(MsgSeqNo, undefined, State); + confirm(MsgSeqNo, undefined, State); process_routing_result(routed, [], MsgSeqNo, _, State) -> - send_or_enqueue_ack(MsgSeqNo, undefined, State); + confirm(MsgSeqNo, undefined, State); process_routing_result(routed, _, undefined, _, State) -> State; process_routing_result(routed, QPids, MsgSeqNo, _, @@ -1339,8 +1339,8 @@ erase_queue_stats(QPid) -> {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. start_confirm_timer(State = #ch{confirm_tref = undefined}) -> - {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL, - ?MODULE, flush_multiple_acks, [self()]), + {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL, + ?MODULE, flush_confirms, [self()]), State#ch{confirm_tref = TRef}; start_confirm_timer(State) -> State. @@ -1351,8 +1351,8 @@ stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#ch{confirm_tref = undefined}. -flush_multiple(State = #ch{writer_pid = WriterPid, - held_confirms = Cs}) -> +internal_flush_confirms(State = #ch{writer_pid = WriterPid, + held_confirms = Cs}) -> case gb_sets:is_empty(Cs) of true -> State#ch{confirm_tref = undefined}; false -> [First | Rest] = gb_sets:to_list(Cs), |
