diff options
| -rw-r--r-- | src/rabbit_channel.erl | 45 |
1 files changed, 21 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 85ace25b74..14f85d59af 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -283,8 +283,8 @@ handle_cast(flush_multiple_acks, held_confirms = As, need_confirming = NA}) -> handle_multiple_flush(WriterPid, As, NA), - {noreply, State #ch { held_confirms = gb_sets:new(), - confirm_tref = undefined }}; + {noreply, State#ch{held_confirms = gb_sets:new(), + confirm_tref = undefined}}; handle_cast({confirm, MsgSeqNo}, State) -> {noreply, send_or_enqueue_ack(MsgSeqNo, State)}; @@ -302,25 +302,25 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, S = gb_sets:fold(fun (MsgSeqNo, State0) -> send_or_enqueue_ack(MsgSeqNo, State0) end, State, Msgs), - S #ch {qpid_to_msgs = dict:erase(QPid, QTM)}; + S #ch{qpid_to_msgs = dict:erase(QPid, QTM)}; error -> State end, erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1)}. -handle_pre_hibernate(State = #ch { writer_pid = WriterPid, - held_confirms = As, - stats_timer = StatsTimer, - need_confirming = NA }) -> +handle_pre_hibernate(State = #ch{writer_pid = WriterPid, + held_confirms = As, + stats_timer = StatsTimer, + need_confirming = NA}) -> ok = clear_permission_cache(), handle_multiple_flush(WriterPid, As, NA), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), - {hibernate, State #ch { held_confirms = gb_sets:new(), - stats_timer = rabbit_event:stop_stats_timer(StatsTimer), - confirm_tref = undefined }}. + {hibernate, State#ch{held_confirms = gb_sets:new(), + stats_timer = rabbit_event:stop_stats_timer(StatsTimer), + confirm_tref = undefined}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -487,14 +487,12 @@ send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) -> msg_sent_to_queues(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) -> case dict:find(QPid, QTM) of {ok, Msgs} -> - State #ch {qpid_to_msgs = dict:store(QPid, - gb_sets:add(MsgSeqNo, Msgs), - QTM) }; + State#ch{ + qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs), QTM)}; error -> erlang:monitor(process, QPid), - State #ch { qpid_to_msgs = dict:store(QPid, - gb_sets:add(MsgSeqNo, gb_sets:new()), - QTM) } + State#ch{ + qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, gb_sets:new()), QTM)} end. do_if_not_dup(MsgSeqNo, State = #ch{need_confirming = NA}, Fun) -> @@ -546,10 +544,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, true -> Count = State#ch.published_count, {Count, - State #ch { published_count = Count + 1, - need_confirming = - gb_sets:add(Count, - State#ch.need_confirming) }} + State#ch{published_count = Count + 1, + need_confirming = + gb_sets:add(Count, State#ch.need_confirming) }} end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -1008,8 +1005,8 @@ handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, State = #ch{confirm_enabled = false}) -> rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n", [Multiple, NoWait]), - State1 = State #ch { confirm_enabled = true, - confirm_multiple = Multiple }, + State1 = State#ch{confirm_enabled = true, + confirm_multiple = Multiple}, case NoWait of true -> {noreply, State1}; false -> {reply, #'confirm.select_ok'{}, State1} @@ -1359,7 +1356,7 @@ erase_queue_stats(QPid) -> start_ack_timer(State = #ch{confirm_tref = undefined}) -> {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL, ?MODULE, flush_multiple_acks, [self()]), - State #ch { confirm_tref = TRef }; + State#ch{confirm_tref = TRef}; start_ack_timer(State) -> State. @@ -1367,7 +1364,7 @@ stop_ack_timer(State = #ch{confirm_tref = undefined}) -> State; stop_ack_timer(State = #ch{confirm_tref = TRef}) -> {ok, cancel} = timer:cancel(TRef), - State #ch { confirm_tref = undefined }. + State#ch{confirm_tref = undefined}. handle_multiple_flush(WriterPid, As, NA) -> case gb_sets:is_empty(As) of |
