summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl117
1 files changed, 56 insertions, 61 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 954fb88891..e32ed1c458 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -48,8 +48,8 @@
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm}).
--record(confirm, {enabled, count, multiple, tref, held_acks, need_acking}).
+ confirm_enabled, published_count, confirm_multiple, confirm_tref,
+ held_confirms, need_confirming}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -167,28 +167,28 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
- State = #ch{state = starting,
- channel = Channel,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- limiter_pid = undefined,
- transaction_id = none,
- tx_participants = sets:new(),
- next_tag = 1,
- uncommitted_ack_q = queue:new(),
- unacked_message_q = queue:new(),
- username = Username,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new(),
- blocking = dict:new(),
- queue_collector_pid = CollectorPid,
- stats_timer = rabbit_event:init_stats_timer(),
- confirm = #confirm{enabled = false,
- count = 0,
- multiple = false,
- held_acks = gb_sets:new(),
- need_acking = gb_sets:new()}},
+ State = #ch{ state = starting,
+ channel = Channel,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ limiter_pid = undefined,
+ transaction_id = none,
+ tx_participants = sets:new(),
+ next_tag = 1,
+ uncommitted_ack_q = queue:new(),
+ unacked_message_q = queue:new(),
+ username = Username,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ consumer_mapping = dict:new(),
+ blocking = dict:new(),
+ queue_collector_pid = CollectorPid,
+ stats_timer = rabbit_event:init_stats_timer(),
+ confirm_enabled = false,
+ published_count = 0,
+ confirm_multiple = false,
+ held_confirms = gb_sets:new(),
+ need_confirming = gb_sets:new() },
rabbit_event:notify(
channel_created,
[{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
@@ -258,17 +258,17 @@ handle_cast(emit_stats, State) ->
{noreply, State};
handle_cast(multiple_ack_flush,
- State = #ch{writer_pid = WriterPid,
- confirm = C = #confirm{held_acks = As,
- need_acking = NA}}) ->
+ State = #ch{writer_pid = WriterPid,
+ held_confirms = As,
+ need_confirming = NA}) ->
rabbit_log:info("channel got a multiple_ack_flush message~n"
"held acks: ~p~n", [gb_sets:to_list(As)]),
case gb_sets:is_empty(As) of
true -> ok; % this should never be the case
false -> flush_multiple(As, WriterPid, gb_sets:smallest(NA))
end,
- {noreply, State#ch{confirm = C#confirm{held_acks = gb_sets:new(),
- tref = undefined}}};
+ {noreply, State #ch { held_confirms = gb_sets:new(),
+ confirm_tref = undefined }};
handle_cast({confirm, MsgSeqNo}, State) ->
{noreply, send_or_enqueue_ack(MsgSeqNo, State)}.
@@ -427,10 +427,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
send_or_enqueue_ack(undefined, State) ->
State;
-send_or_enqueue_ack(_, State = #ch{confirm = #confirm{enabled = false}}) ->
+send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) ->
State;
send_or_enqueue_ack(MsgSeqNo,
- State = #ch{confirm = #confirm{multiple = false}}) ->
+ State = #ch{confirm_multiple = false}) ->
rabbit_log:info("handling confirm in single mode (#~p)~n", [MsgSeqNo]),
do_if_not_dup(MsgSeqNo, State,
fun(MSN, S = #ch{writer_pid = WriterPid}) ->
@@ -438,23 +438,20 @@ send_or_enqueue_ack(MsgSeqNo,
WriterPid, #'basic.ack'{delivery_tag = MSN}),
S
end);
-send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm = #confirm{multiple = true}}) ->
+send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) ->
rabbit_log:info("handling confirm in multiple mode (#~p)~n", [MsgSeqNo]),
do_if_not_dup(MsgSeqNo, State,
fun(MSN, S) ->
- State1
- = #ch{confirm = C = #confirm{held_acks = As}}
- = start_ack_timer(S),
- State1#ch{confirm =
- C#confirm{held_acks =
- gb_sets:add(MSN, As)}}
+ State1 = start_ack_timer(S),
+ State1 #ch { held_confirms =
+ gb_sets:add(MSN, State1#ch.held_confirms) }
end).
-do_if_not_dup(MsgSeqNo, State = #ch{confirm = #confirm{need_acking = NA}}, Fun) ->
+do_if_not_dup(MsgSeqNo, State = #ch{need_confirming = NA}, Fun) ->
case gb_sets:is_element(MsgSeqNo, NA) of
true ->
- State1 = #ch{confirm = C} = Fun(MsgSeqNo, State),
- State1#ch{confirm = C#confirm{need_acking = gb_sets:delete(MsgSeqNo, NA)}};
+ State1 = Fun(MsgSeqNo, State),
+ State1 #ch { need_confirming = gb_sets:delete(MsgSeqNo, NA) };
false ->
State
end.
@@ -494,16 +491,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
IsPersistent = is_message_persistent(DecodedContent),
% PubAck transient messages immediately
{MsgSeqNo, State1}
- = case State#ch.confirm#confirm.enabled of
+ = case State#ch.confirm_enabled of
false ->
{undefined, State};
true ->
- Confirm = State#ch.confirm,
- Count = Confirm#confirm.count,
+ Count = State#ch.published_count,
% Add the current message to the need_acking list
- State01 = State#ch{confirm = Confirm#confirm{
- need_acking = gb_sets:add(Count,
- Confirm#confirm.need_acking)}},
+ State01 = State #ch {
+ need_confirming = gb_sets:add(Count,
+ State#ch.need_confirming) },
% Ack transient messages now
{CountOrUndefined, State02} =
case IsPersistent of
@@ -513,9 +509,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Count, State01)}
end,
% Increase the PubAck counter
- Confirm02 = State02#ch.confirm,
{CountOrUndefined,
- State02#ch{confirm = Confirm02#confirm{count = Count+1}}}
+ State02 #ch { published_count = Count + 1 }}
end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -930,7 +925,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
#'queue.purge_ok'{message_count = PurgedMessageCount});
-handle_method(#'tx.select'{}, _, #ch{confirm = #confirm{enabled = true}}) ->
+handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
rabbit_misc:protocol_error(
precondition_failed, "a confirm channel cannot be made transactional", []);
@@ -961,11 +956,11 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
_,
- State = #ch{confirm = C = #confirm{enabled = false}}) ->
+ State = #ch{confirm_enabled = false}) ->
rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n",
[Multiple, NoWait]),
- State1 = State#ch{confirm = C#confirm{enabled = true,
- multiple = Multiple}},
+ State1 = State #ch { confirm_enabled = true,
+ confirm_multiple = Multiple },
case NoWait of
true -> {noreply, State1};
false -> {reply, #'confirm.select_ok'{}, State1}
@@ -973,8 +968,8 @@ handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
_,
- State = #ch{confirm = #confirm{enabled = true,
- multiple = Multiple}}) ->
+ State = #ch{confirm_enabled = true,
+ confirm_multiple = Multiple}) ->
rabbit_log:info("got a confirm.select with same options~n"),
case NoWait of
true -> {noreply, State};
@@ -983,7 +978,7 @@ handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
handle_method(#'confirm.select'{},
_,
- #ch{confirm = #confirm{enabled = true}}) ->
+ #ch{confirm_enabled = true}) ->
rabbit_misc:protocol_error(
precondition_failed, "cannot change confirm channel multiple setting", []);
@@ -1311,18 +1306,18 @@ erase_queue_stats(QPid) ->
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
-start_ack_timer(State = #ch{confirm = C = #confirm{tref = undefined}}) ->
+start_ack_timer(State = #ch{confirm_tref = undefined}) ->
{ok, TRef} = timer:apply_after(?MULTIPLE_ACK_FLUSH_INTERVAL,
?MODULE, flush_multiple_acks, [self()]),
- State#ch{confirm = C#confirm{tref = TRef}};
+ State #ch { confirm_tref = TRef };
start_ack_timer(State) ->
State.
-stop_ack_timer(State = #ch{confirm = #confirm{tref = undefined}}) ->
+stop_ack_timer(State = #ch{confirm_tref = undefined}) ->
State;
-stop_ack_timer(State = #ch{confirm = C = #confirm{tref = TRef}}) ->
+stop_ack_timer(State = #ch{confirm_tref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
- State#ch{confirm = C#confirm{tref = undefined}}.
+ State #ch { confirm_tref = undefined }.
flush_multiple(Acks, WriterPid, SmallestNotAcked) ->
[First | Rest] = gb_sets:to_list(Acks),