summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl45
1 files changed, 21 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 85ace25b74..14f85d59af 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -283,8 +283,8 @@ handle_cast(flush_multiple_acks,
held_confirms = As,
need_confirming = NA}) ->
handle_multiple_flush(WriterPid, As, NA),
- {noreply, State #ch { held_confirms = gb_sets:new(),
- confirm_tref = undefined }};
+ {noreply, State#ch{held_confirms = gb_sets:new(),
+ confirm_tref = undefined}};
handle_cast({confirm, MsgSeqNo}, State) ->
{noreply, send_or_enqueue_ack(MsgSeqNo, State)};
@@ -302,25 +302,25 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
S = gb_sets:fold(fun (MsgSeqNo, State0) ->
send_or_enqueue_ack(MsgSeqNo, State0)
end, State, Msgs),
- S #ch {qpid_to_msgs = dict:erase(QPid, QTM)};
+ S #ch{qpid_to_msgs = dict:erase(QPid, QTM)};
error ->
State
end,
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1)}.
-handle_pre_hibernate(State = #ch { writer_pid = WriterPid,
- held_confirms = As,
- stats_timer = StatsTimer,
- need_confirming = NA }) ->
+handle_pre_hibernate(State = #ch{writer_pid = WriterPid,
+ held_confirms = As,
+ stats_timer = StatsTimer,
+ need_confirming = NA}) ->
ok = clear_permission_cache(),
handle_multiple_flush(WriterPid, As, NA),
rabbit_event:if_enabled(StatsTimer, fun() ->
internal_emit_stats(State)
end),
- {hibernate, State #ch { held_confirms = gb_sets:new(),
- stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
- confirm_tref = undefined }}.
+ {hibernate, State#ch{held_confirms = gb_sets:new(),
+ stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
+ confirm_tref = undefined}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -487,14 +487,12 @@ send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) ->
msg_sent_to_queues(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) ->
case dict:find(QPid, QTM) of
{ok, Msgs} ->
- State #ch {qpid_to_msgs = dict:store(QPid,
- gb_sets:add(MsgSeqNo, Msgs),
- QTM) };
+ State#ch{
+ qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs), QTM)};
error ->
erlang:monitor(process, QPid),
- State #ch { qpid_to_msgs = dict:store(QPid,
- gb_sets:add(MsgSeqNo, gb_sets:new()),
- QTM) }
+ State#ch{
+ qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, gb_sets:new()), QTM)}
end.
do_if_not_dup(MsgSeqNo, State = #ch{need_confirming = NA}, Fun) ->
@@ -546,10 +544,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
true ->
Count = State#ch.published_count,
{Count,
- State #ch { published_count = Count + 1,
- need_confirming =
- gb_sets:add(Count,
- State#ch.need_confirming) }}
+ State#ch{published_count = Count + 1,
+ need_confirming =
+ gb_sets:add(Count, State#ch.need_confirming) }}
end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -1008,8 +1005,8 @@ handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
State = #ch{confirm_enabled = false}) ->
rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n",
[Multiple, NoWait]),
- State1 = State #ch { confirm_enabled = true,
- confirm_multiple = Multiple },
+ State1 = State#ch{confirm_enabled = true,
+ confirm_multiple = Multiple},
case NoWait of
true -> {noreply, State1};
false -> {reply, #'confirm.select_ok'{}, State1}
@@ -1359,7 +1356,7 @@ erase_queue_stats(QPid) ->
start_ack_timer(State = #ch{confirm_tref = undefined}) ->
{ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL,
?MODULE, flush_multiple_acks, [self()]),
- State #ch { confirm_tref = TRef };
+ State#ch{confirm_tref = TRef};
start_ack_timer(State) ->
State.
@@ -1367,7 +1364,7 @@ stop_ack_timer(State = #ch{confirm_tref = undefined}) ->
State;
stop_ack_timer(State = #ch{confirm_tref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
- State #ch { confirm_tref = undefined }.
+ State#ch{confirm_tref = undefined}.
handle_multiple_flush(WriterPid, As, NA) ->
case gb_sets:is_empty(As) of