summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-04-05 17:43:32 +0100
committerkjnilsson <knilsson@pivotal.io>2019-04-23 14:36:10 +0100
commitd389d045df3f60bb19511d3240f3cee7c1de534c (patch)
tree77370a074a006a4bbdbec92c8e93a9aadefa2b5d /src
parent04e4d0b5ba486abb16e873b4d3df6e8792b009e6 (diff)
downloadrabbitmq-server-git-d389d045df3f60bb19511d3240f3cee7c1de534c.tar.gz
Refactor channel state
To put static fields into a nested record to avoid some copying on update. Also exit the channel when a basic.get times out. [#164212469]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl608
1 files changed, 326 insertions, 282 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 32579050db..a4d3ec6321 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -81,92 +81,96 @@
%% Mgmt HTTP API refactor
-export([handle_method/6]).
--record(ch, {
- %% starting | running | flow | closing
- state,
- %% same as reader's protocol. Used when instantiating
- %% (protocol) exceptions.
- protocol,
- %% channel number
- channel,
- %% reader process
- reader_pid,
- %% writer process
- writer_pid,
- %%
- conn_pid,
- %% same as reader's name, see #v1.name
- %% in rabbit_reader
- conn_name,
- %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined
- %% or any other channel creating/spawning entity
- source,
- %% limiter pid, see rabbit_limiter
- limiter,
- %% none | {Msgs, Acks} | committing | failed |
- tx,
- %% (consumer) delivery tag sequence
- next_tag,
- %% messages pending consumer acknowledgement
- unacked_message_q,
- %% same as #v1.user in the reader, used in
- %% authorisation checks
- user,
- %% same as #v1.user in the reader
- virtual_host,
- %% when queue.bind's queue field is empty,
- %% this name will be used instead
- most_recently_declared_queue,
- %% a map of queue ref to queue name
- queue_names,
- %% queue processes are monitored to update
- %% queue names
- queue_monitors,
- %% a map of consumer tags to
- %% consumer details: #amqqueue record, acknowledgement mode,
- %% consumer exclusivity, etc
- consumer_mapping,
- %% a map of queue pids to consumer tag lists
- queue_consumers,
- %% a set of pids of queues that have unacknowledged
- %% deliveries
- delivering_queues,
- %% when a queue is declared as exclusive, queue
- %% collector must be notified.
- %% see rabbit_queue_collector for more info.
- queue_collector_pid,
- %% timer used to emit statistics
- stats_timer,
- %% are publisher confirms enabled for this channel?
- confirm_enabled,
- %% publisher confirm delivery tag sequence
- publish_seqno,
- %% a dtree used to track unconfirmed
- %% (to publishers) messages
- unconfirmed,
- %% a list of tags for published messages that were
- %% delivered but are yet to be confirmed to the client
- confirmed,
- %% a list of tags for published messages that were
- %% rejected but are yet to be sent to the client
- rejected,
- %% same as capabilities in the reader
- capabilities,
- %% tracing exchange resource if tracing is enabled,
- %% 'none' otherwise
- trace_state,
- consumer_prefetch,
- %% used by "one shot RPC" (amq.
- reply_consumer,
- %% flow | noflow, see rabbitmq-server#114
- delivery_flow,
- interceptor_state,
- queue_states,
- queue_cleanup_timer,
- %% Message content size limit
- max_message_size,
- consumer_timeout
-}).
+-record(conf, {
+ %% starting | running | flow | closing
+ state,
+ %% same as reader's protocol. Used when instantiating
+ %% (protocol) exceptions.
+ protocol,
+ %% channel number
+ channel,
+ %% reader process
+ reader_pid,
+ %% writer process
+ writer_pid,
+ %%
+ conn_pid,
+ %% same as reader's name, see #v1.name
+ %% in rabbit_reader
+ conn_name,
+ %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined
+ %% or any other channel creating/spawning entity
+ source,
+ %% same as #v1.user in the reader, used in
+ %% authorisation checks
+ user,
+ %% same as #v1.user in the reader
+ virtual_host,
+ %% when queue.bind's queue field is empty,
+ %% this name will be used instead
+ most_recently_declared_queue,
+ %% when a queue is declared as exclusive, queue
+ %% collector must be notified.
+ %% see rabbit_queue_collector for more info.
+ queue_collector_pid,
+
+ %% same as capabilities in the reader
+ capabilities,
+ %% tracing exchange resource if tracing is enabled,
+ %% 'none' otherwise
+ trace_state,
+ consumer_prefetch,
+ %% Message content size limit
+ max_message_size,
+ consumer_timeout
+ }).
+
+-record(ch, {cfg :: #conf{},
+ %% limiter state, see rabbit_limiter
+ limiter,
+ %% none | {Msgs, Acks} | committing | failed |
+ tx,
+ %% (consumer) delivery tag sequence
+ next_tag,
+ %% messages pending consumer acknowledgement
+ unacked_message_q,
+ %% a map of queue ref to queue name
+ queue_names,
+ %% queue processes are monitored to update
+ %% queue names
+ queue_monitors,
+ %% a map of consumer tags to
+ %% consumer details: #amqqueue record, acknowledgement mode,
+ %% consumer exclusivity, etc
+ consumer_mapping,
+ %% a map of queue pids to consumer tag lists
+ queue_consumers,
+ %% a set of pids of queues that have unacknowledged
+ %% deliveries
+ delivering_queues,
+ %% timer used to emit statistics
+ stats_timer,
+ %% are publisher confirms enabled for this channel?
+ confirm_enabled,
+ %% publisher confirm delivery tag sequence
+ publish_seqno,
+ %% a dtree used to track unconfirmed
+ %% (to publishers) messages
+ unconfirmed,
+ %% a list of tags for published messages that were
+ %% delivered but are yet to be confirmed to the client
+ confirmed,
+ %% a list of tags for published messages that were
+ %% rejected but are yet to be sent to the client
+ rejected,
+ %% used by "one shot RPC" (amq.
+ reply_consumer,
+ %% flow | noflow, see rabbitmq-server#114
+ delivery_flow,
+ interceptor_state,
+ queue_states,
+ queue_cleanup_timer
+ }).
-define(QUEUE, lqueue).
@@ -485,40 +489,43 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
end,
MaxMessageSize = get_max_message_size(),
ConsumerTimeout = get_consumer_timeout(),
- State = #ch{state = starting,
- protocol = Protocol,
- channel = Channel,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- conn_pid = ConnPid,
- conn_name = ConnName,
- limiter = Limiter,
+ rabbit_log:info("consumer timeout ~w", [ConsumerTimeout]),
+ State = #ch{cfg = #conf{state = starting,
+ protocol = Protocol,
+ channel = Channel,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ conn_name = ConnName,
+ user = User,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ queue_collector_pid = CollectorPid,
+ capabilities = Capabilities,
+ trace_state = rabbit_trace:init(VHost),
+ consumer_prefetch = Prefetch,
+ max_message_size = MaxMessageSize,
+ consumer_timeout = ConsumerTimeout
+ },
+ limiter = Limiter,
tx = none,
next_tag = 1,
unacked_message_q = ?QUEUE:new(),
- user = User,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
queue_names = #{},
queue_monitors = pmon:new(),
consumer_mapping = #{},
queue_consumers = #{},
delivering_queues = sets:new(),
- queue_collector_pid = CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
unconfirmed = dtree:empty(),
rejected = [],
confirmed = [],
- capabilities = Capabilities,
- trace_state = rabbit_trace:init(VHost),
- consumer_prefetch = Prefetch,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
- queue_states = #{},
- max_message_size = MaxMessageSize,
- consumer_timeout = ConsumerTimeout},
+ queue_states = #{}
+ },
State1 = State#ch{
interceptor_state = rabbit_channel_interceptor:init(State)},
State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
@@ -571,8 +578,9 @@ handle_call({{info, Items}, Deadline}, _From, State) ->
reply({error, Error}, State)
end;
-handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
- reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)});
+handle_call(refresh_config, _From,
+ State = #ch{cfg = #conf{virtual_host = VHost} = Cfg}) ->
+ reply(ok, State#ch{cfg = Cfg#conf{trace_state = rabbit_trace:init(VHost)}});
handle_call(refresh_interceptors, _From, State) ->
IState = rabbit_channel_interceptor:init(State),
@@ -593,7 +601,7 @@ handle_call(_Request, _From, State) ->
noreply(State).
handle_cast({method, Method, Content, Flow},
- State = #ch{reader_pid = Reader,
+ State = #ch{cfg = #conf{reader_pid = Reader},
interceptor_state = IState}) ->
case Flow of
%% We are going to process a message from the rabbit_reader
@@ -620,12 +628,13 @@ handle_cast({method, Method, Content, Flow},
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
-handle_cast(ready_for_close, State = #ch{state = closing,
- writer_pid = WriterPid}) ->
+handle_cast(ready_for_close,
+ State = #ch{cfg = #conf{state = closing,
+ writer_pid = WriterPid}}) ->
ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
{stop, normal, State};
-handle_cast(terminate, State = #ch{writer_pid = WriterPid}) ->
+handle_cast(terminate, State = #ch{cfg = #conf{writer_pid = WriterPid}}) ->
ok = rabbit_writer:flush(WriterPid),
{stop, normal, State};
@@ -637,12 +646,14 @@ handle_cast({command, Msg}, State) ->
ok = send(Msg, State),
noreply(State);
-handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) ->
+handle_cast({deliver, _CTag, _AckReq, _Msg},
+ State = #ch{cfg = #conf{state = closing}}) ->
noreply(State);
handle_cast({deliver, ConsumerTag, AckRequired, Msg}, State) ->
noreply(handle_deliver(ConsumerTag, AckRequired, Msg, State));
-handle_cast({deliver_reply, _K, _Del}, State = #ch{state = closing}) ->
+handle_cast({deliver_reply, _K, _Del},
+ State = #ch{cfg = #conf{state = closing}}) ->
noreply(State);
handle_cast({deliver_reply, _K, _Del}, State = #ch{reply_consumer = none}) ->
noreply(State);
@@ -650,8 +661,8 @@ handle_cast({deliver_reply, Key, #delivery{message =
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag,
+ State = #ch{cfg = #conf{writer_pid = WriterPid},
+ next_tag = DeliveryTag,
reply_consumer = {ConsumerTag, _Suffix, Key}}) ->
ok = rabbit_writer:send_command(
WriterPid,
@@ -665,12 +676,14 @@ handle_cast({deliver_reply, Key, #delivery{message =
handle_cast({deliver_reply, _K1, _}, State=#ch{reply_consumer = {_, _, _K2}}) ->
noreply(State);
-handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) ->
+handle_cast({send_credit_reply, Len},
+ State = #ch{cfg = #conf{writer_pid = WriterPid}}) ->
ok = rabbit_writer:send_command(
WriterPid, #'basic.credit_ok'{available = Len}),
noreply(State);
-handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
+handle_cast({send_drained, CTagCredit},
+ State = #ch{cfg = #conf{writer_pid = WriterPid}}) ->
[ok = rabbit_writer:send_command(
WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag,
credit_drained = CreditDrained})
@@ -737,7 +750,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
{internal, MsgSeqNos, Actions, QState1} ->
State = State0#ch{queue_states = maps:put(Name, QState1, QueueStates)},
%% execute actions
- WriterPid = State#ch.writer_pid,
+ WriterPid = State#ch.cfg#conf.writer_pid,
lists:foreach(fun ({send_credit_reply, Avail}) ->
ok = rabbit_writer:send_command(
WriterPid,
@@ -808,64 +821,79 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
-handle_info({{Ref, Node}, LateAnswer}, State = #ch{channel = Channel})
+handle_info({{Ref, Node}, LateAnswer},
+ State = #ch{cfg = #conf{channel = Channel}})
when is_reference(Ref) ->
rabbit_log_channel:warning("Channel ~p ignoring late answer ~p from ~p",
[Channel, LateAnswer, Node]),
noreply(State);
-handle_info(queue_cleanup, State0 = #ch{channel = Channel,
+handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel,
+ consumer_timeout = Timeout},
queue_states = QueueStates0,
queue_names = QNames,
queue_consumers = QCons,
- consumer_timeout = Timeout,
unacked_message_q = UAMQ}) ->
QueueStates1 =
maps:filter(fun(_, QS) ->
QName = rabbit_quorum_queue:queue_name(QS),
[] /= rabbit_amqqueue:lookup(QName)
end, QueueStates0),
+
Now = os:system_time(millisecond),
case ?QUEUE:peek(UAMQ) of
{value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
- when is_integer(Timeout) andalso Time < Now - Timeout ->
- rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
- "waiting on ack", [ConsumerTag, Channel]),
- %% Cancel consumer both to the client and queue
- %% and return all messages to the queue
- QRef = qpid_to_ref(QPid),
- QName = maps:get(QRef, QNames),
- %% cancel the consumer with the client
- State1 = cancel_consumer(ConsumerTag, QName, State0),
- [Q] = rabbit_amqqueue:lookup([QName]),
- %% send basic cancel to the queue
- {ok, QueueStates2} = rabbit_amqqueue:basic_cancel(
- Q, self(), ConsumerTag, undefined,
- <<"broker">>, QueueStates1),
- %% return all in-flight messages
- {MsgIds, Rem} = lists:foldl(
- fun({_DelTag, ConTag, _Time, {_, MsgId}},
- {Ids, Rem})
- when ConTag == ConsumerTag ->
- {[MsgId | Ids], Rem};
- (Unacked, {Ids, Rem}) ->
- {Ids, ?QUEUE:in(Unacked, Rem)}
- end, {[], ?QUEUE:new()},
- ?QUEUE:to_list(UAMQ)),
- QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
- self(), QueueStates2),
-
- State = State1#ch{queue_states = QueueStates,
- queue_consumers = maps:remove(QRef, QCons),
- unacked_message_q = Rem},
- noreply(init_queue_cleanup_timer(State));
+ when is_integer(Timeout)
+ andalso Time < Now - Timeout ->
+ case ConsumerTag of
+ _ when is_integer(ConsumerTag) ->
+ %% basic.get - there is no mechanims so we just crash the
+ %% channel
+ Ex = rabbit_misc:amqp_error(precondition_failed,
+ "basic.get ack timed out on channel ~w",
+ [Channel], none),
+ handle_exception(Ex, State0);
+ % rabbit_misc:protocol_error(precondition_failed,
+ % "basic.get ack timed out on channel ~w ",
+ % [Channel]);
+ _ ->
+ rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
+ "waiting on ack",
+ [rabbit_data_coercion:to_binary(ConsumerTag),
+ Channel]),
+ QRef = qpid_to_ref(QPid),
+ QName = maps:get(QRef, QNames),
+ %% cancel the consumer with the client
+ State1 = cancel_consumer(ConsumerTag, QName, State0),
+ [Q] = rabbit_amqqueue:lookup([QName]),
+ %% send basic cancel to the queue
+ {ok, QueueStates2} = rabbit_amqqueue:basic_cancel(
+ Q, self(), ConsumerTag, undefined,
+ <<"broker">>, QueueStates1),
+ %% return all in-flight messages for the consumer
+ {MsgIds, Rem} = lists:foldl(
+ fun({_DelTag, ConTag, _Time, {_, MsgId}},
+ {Ids, Rem}) when ConTag == ConsumerTag ->
+ {[MsgId | Ids], Rem};
+ (Unacked, {Ids, Rem}) ->
+ {Ids, ?QUEUE:in(Unacked, Rem)}
+ end, {[], ?QUEUE:new()},
+ ?QUEUE:to_list(UAMQ)),
+ QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
+ self(), QueueStates2),
+
+ State = State1#ch{queue_states = QueueStates,
+ queue_consumers = maps:remove(QRef, QCons),
+ unacked_message_q = Rem},
+ noreply(init_queue_cleanup_timer(State))
+ end;
_ ->
noreply(
init_queue_cleanup_timer(
State0#ch{queue_states = QueueStates1}))
end;
-handle_info({channel_source, Source}, State = #ch{}) ->
- noreply(State#ch{source = Source}).
+handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
+ noreply(State#ch{cfg = Cfg#conf{source = Source}}).
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -877,7 +905,8 @@ handle_pre_hibernate(State) ->
end),
{hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
-terminate(_Reason, State = #ch{user = #user{username = Username}}) ->
+terminate(_Reason,
+ State = #ch{cfg = #conf{user = #user{username = Username}}}) ->
{_Res, _State1} = notify_queues(State),
pg_local:leave(rabbit_channels, self()),
rabbit_event:if_enabled(State, #ch.stats_timer,
@@ -930,22 +959,23 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
-send(_Command, #ch{state = closing}) ->
+send(_Command, #ch{cfg = #conf{state = closing}}) ->
ok;
-send(Command, #ch{writer_pid = WriterPid}) ->
+send(Command, #ch{cfg = #conf{writer_pid = WriterPid}}) ->
ok = rabbit_writer:send_command(WriterPid, Command).
format_soft_error(#amqp_error{name = N, explanation = E, method = M}) ->
io_lib:format("operation ~s caused a channel exception ~s: ~ts", [M, N, E]).
-handle_exception(Reason, State = #ch{protocol = Protocol,
- channel = Channel,
- writer_pid = WriterPid,
- reader_pid = ReaderPid,
- conn_pid = ConnPid,
- conn_name = ConnName,
- virtual_host = VHost,
- user = User}) ->
+handle_exception(Reason, State = #ch{cfg = #conf{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid,
+ conn_name = ConnName,
+ virtual_host = VHost,
+ user = User
+ }}) ->
%% something bad's happened: notify_queues may not be 'ok'
{_Result, State1} = notify_queues(State),
case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of
@@ -972,11 +1002,12 @@ precondition_failed(Format, Params) ->
rabbit_misc:protocol_error(precondition_failed, Format, Params).
return_queue_declare_ok(#resource{name = ActualName},
- NoWait, MessageCount, ConsumerCount, State) ->
- return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait,
- #'queue.declare_ok'{queue = ActualName,
- message_count = MessageCount,
- consumer_count = ConsumerCount}).
+ NoWait, MessageCount, ConsumerCount,
+ #ch{cfg = Cfg} = State) ->
+ return_ok(State#ch{cfg = Cfg#conf{most_recently_declared_queue = ActualName}},
+ NoWait, #'queue.declare_ok'{queue = ActualName,
+ message_count = MessageCount,
+ consumer_count = ConsumerCount}).
check_resource_access(User, Resource, Perm) ->
V = {Resource, Perm},
@@ -1014,15 +1045,15 @@ check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
ok;
check_user_id_header(#'P_basic'{user_id = Username},
- #ch{user = #user{username = Username}}) ->
+ #ch{cfg = #conf{user = #user{username = Username}}}) ->
ok;
check_user_id_header(
- #'P_basic'{}, #ch{user = #user{authz_backends =
- [{rabbit_auth_backend_dummy, _}]}}) ->
+ #'P_basic'{}, #ch{cfg = #conf{user = #user{authz_backends =
+ [{rabbit_auth_backend_dummy, _}]}}}) ->
ok;
check_user_id_header(#'P_basic'{user_id = Claimed},
- #ch{user = #user{username = Actual,
- tags = Tags}}) ->
+ #ch{cfg = #conf{user = #user{username = Actual,
+ tags = Tags}}}) ->
case lists:member(impersonator, Tags) of
true -> ok;
false -> precondition_failed(
@@ -1131,18 +1162,18 @@ qbin_to_resource(QueueNameBin, VHostPath) ->
name_to_resource(Type, NameBin, VHostPath) ->
rabbit_misc:r(VHostPath, Type, NameBin).
-expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
+expand_queue_name_shortcut(<<>>, #ch{cfg = #conf{most_recently_declared_queue = <<>>}}) ->
rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
-expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) ->
+expand_queue_name_shortcut(<<>>, #ch{cfg = #conf{most_recently_declared_queue = MRDQ}}) ->
MRDQ;
expand_queue_name_shortcut(QueueNameBin, _) ->
QueueNameBin.
expand_routing_key_shortcut(<<>>, <<>>,
- #ch{most_recently_declared_queue = <<>>}) ->
+ #ch{cfg = #conf{most_recently_declared_queue = <<>>}}) ->
rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
expand_routing_key_shortcut(<<>>, <<>>,
- #ch{most_recently_declared_queue = MRDQ}) ->
+ #ch{cfg = #conf{most_recently_declared_queue = MRDQ}}) ->
MRDQ;
expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
RoutingKey.
@@ -1230,9 +1261,10 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
handle_method({Method, Content}, State) ->
handle_method(Method, Content, State).
-handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
+handle_method(#'channel.open'{}, _,
+ State = #ch{cfg = #conf{state = starting} = Cfg}) ->
%% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
- State1 = State#ch{state = running},
+ State1 = State#ch{cfg = Cfg#conf{state = running}},
rabbit_event:if_enabled(State1, #ch.stats_timer,
fun() -> emit_stats(State1) end),
{reply, #'channel.open_ok'{}, State1};
@@ -1241,21 +1273,23 @@ handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
channel_error, "second 'channel.open' seen", []);
-handle_method(_Method, _, #ch{state = starting}) ->
+handle_method(_Method, _, #ch{cfg = #conf{state = starting}}) ->
rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []);
-handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) ->
+handle_method(#'channel.close_ok'{}, _, #ch{cfg = #conf{state = closing}}) ->
stop;
-handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid,
- state = closing}) ->
+handle_method(#'channel.close'{}, _,
+ State = #ch{cfg = #conf{state = closing,
+ writer_pid = WriterPid}}) ->
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
{noreply, State};
-handle_method(_Method, _, State = #ch{state = closing}) ->
+handle_method(_Method, _, State = #ch{cfg = #conf{state = closing}}) ->
{noreply, State};
-handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
+handle_method(#'channel.close'{}, _,
+ State = #ch{cfg = #conf{reader_pid = ReaderPid}}) ->
{_Result, State1} = notify_queues(State),
%% We issue the channel.close_ok response after a handshake with
%% the reader, the other half of which is ready_for_close. That
@@ -1287,17 +1321,19 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
- Content, State = #ch{virtual_host = VHostPath,
+ Content, State = #ch{cfg = #conf{channel = ChannelNum,
+ conn_pid = ConnPid,
+ source = ChSrc,
+ conn_name = ConnName,
+ virtual_host = VHostPath,
+ user = #user{username = Username} = User,
+ trace_state = TraceState,
+ max_message_size = MaxMessageSize
+ },
tx = Tx,
- channel = ChannelNum,
confirm_enabled = ConfirmEnabled,
- trace_state = TraceState,
- user = #user{username = Username} = User,
- conn_name = ConnName,
- delivery_flow = Flow,
- conn_pid = ConnPid,
- source = ChSrc,
- max_message_size = MaxMessageSize}) ->
+ delivery_flow = Flow
+ }) ->
check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User),
@@ -1352,12 +1388,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
end};
handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- conn_pid = ConnPid,
- limiter = Limiter,
+ _, State = #ch{cfg = #conf{writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ user = User,
+ virtual_host = VHostPath
+ },
+ limiter = Limiter,
next_tag = DeliveryTag,
- user = User,
- virtual_host = VHostPath,
queue_states = QueueStates0}) ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
check_read_permitted(QueueName, User),
@@ -1437,10 +1474,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
exclusive = ExclusiveConsume,
nowait = NoWait,
arguments = Args},
- _, State = #ch{consumer_prefetch = ConsumerPrefetch,
- consumer_mapping = ConsumerMapping,
- user = User,
- virtual_host = VHostPath}) ->
+ _, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch,
+ user = User,
+ virtual_host = VHostPath},
+ consumer_mapping = ConsumerMapping
+ }) ->
case maps:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
@@ -1472,9 +1510,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
end;
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
- _, State = #ch{consumer_mapping = ConsumerMapping,
+ _, State = #ch{cfg = #conf{user = #user{username = Username}},
+ consumer_mapping = ConsumerMapping,
queue_consumers = QCons,
- user = #user{username = Username},
queue_states = QueueStates0}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case maps:find(ConsumerTag, ConsumerMapping) of
@@ -1522,10 +1560,11 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
handle_method(#'basic.qos'{global = false,
prefetch_count = PrefetchCount},
- _, State = #ch{limiter = Limiter}) ->
+ _, State = #ch{cfg = Cfg,
+ limiter = Limiter}) ->
%% Ensures that if default was set, it's overridden
Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
- {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount,
+ {reply, #'basic.qos_ok'{}, State#ch{cfg = Cfg#conf{consumer_prefetch = PrefetchCount},
limiter = Limiter1}};
handle_method(#'basic.qos'{global = true,
@@ -1583,87 +1622,87 @@ handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue},
reject(DeliveryTag, Requeue, false, State);
handle_method(#'exchange.declare'{nowait = NoWait} = Method,
- _, State = #ch{virtual_host = VHostPath,
- user = User,
- queue_collector_pid = CollectorPid,
- conn_pid = ConnPid,
- source = ChSrc}) ->
+ _, State = #ch{cfg = #conf{virtual_host = VHostPath,
+ user = User,
+ queue_collector_pid = CollectorPid,
+ conn_pid = ConnPid,
+ source = ChSrc}}) ->
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{nowait = NoWait} = Method,
- _, State = #ch{conn_pid = ConnPid,
- source = ChSrc,
- virtual_host = VHostPath,
- queue_collector_pid = CollectorPid,
- user = User}) ->
+ _, State = #ch{cfg = #conf{conn_pid = ConnPid,
+ source = ChSrc,
+ virtual_host = VHostPath,
+ queue_collector_pid = CollectorPid,
+ user = User}}) ->
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.delete_ok'{});
handle_method(#'exchange.bind'{nowait = NoWait} = Method,
- _, State = #ch{virtual_host = VHostPath,
- conn_pid = ConnPid,
- source = ChSrc,
- queue_collector_pid = CollectorPid,
- user = User}) ->
+ _, State = #ch{cfg = #conf{virtual_host = VHostPath,
+ conn_pid = ConnPid,
+ source = ChSrc,
+ queue_collector_pid = CollectorPid,
+ user = User}}) ->
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.bind_ok'{});
handle_method(#'exchange.unbind'{nowait = NoWait} = Method,
- _, State = #ch{virtual_host = VHostPath,
- conn_pid = ConnPid,
- source = ChSrc,
- queue_collector_pid = CollectorPid,
- user = User}) ->
+ _, State = #ch{cfg = #conf{virtual_host = VHostPath,
+ conn_pid = ConnPid,
+ source = ChSrc,
+ queue_collector_pid = CollectorPid,
+ user = User}}) ->
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.unbind_ok'{});
handle_method(#'queue.declare'{nowait = NoWait} = Method,
- _, State = #ch{virtual_host = VHostPath,
- conn_pid = ConnPid,
- source = ChSrc,
- queue_collector_pid = CollectorPid,
- user = User}) ->
+ _, State = #ch{cfg = #conf{virtual_host = VHostPath,
+ conn_pid = ConnPid,
+ source = ChSrc,
+ queue_collector_pid = CollectorPid,
+ user = User}}) ->
{ok, QueueName, MessageCount, ConsumerCount} =
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
handle_method(#'queue.delete'{nowait = NoWait} = Method, _,
- State = #ch{conn_pid = ConnPid,
- source = ChSrc,
- virtual_host = VHostPath,
- queue_collector_pid = CollectorPid,
- user = User}) ->
+ State = #ch{cfg = #conf{conn_pid = ConnPid,
+ source = ChSrc,
+ virtual_host = VHostPath,
+ queue_collector_pid = CollectorPid,
+ user = User}}) ->
{ok, PurgedMessageCount} =
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount});
handle_method(#'queue.bind'{nowait = NoWait} = Method, _,
- State = #ch{conn_pid = ConnPid,
- source = ChSrc,
- user = User,
- queue_collector_pid = CollectorPid,
- virtual_host = VHostPath}) ->
+ State = #ch{cfg = #conf{conn_pid = ConnPid,
+ source = ChSrc,
+ user = User,
+ queue_collector_pid = CollectorPid,
+ virtual_host = VHostPath}}) ->
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'queue.bind_ok'{});
handle_method(#'queue.unbind'{} = Method, _,
- State = #ch{conn_pid = ConnPid,
- source = ChSrc,
- user = User,
- queue_collector_pid = CollectorPid,
- virtual_host = VHostPath}) ->
+ State = #ch{cfg = #conf{conn_pid = ConnPid,
+ source = ChSrc,
+ user = User,
+ queue_collector_pid = CollectorPid,
+ virtual_host = VHostPath}}) ->
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, false, #'queue.unbind_ok'{});
handle_method(#'queue.purge'{nowait = NoWait} = Method,
- _, State = #ch{conn_pid = ConnPid,
- source = ChSrc,
- user = User,
- queue_collector_pid = CollectorPid,
- virtual_host = VHostPath}) ->
+ _, State = #ch{cfg = #conf{conn_pid = ConnPid,
+ source = ChSrc,
+ user = User,
+ queue_collector_pid = CollectorPid,
+ virtual_host = VHostPath}}) ->
case handle_method(Method, ConnPid, ChSrc, CollectorPid,
VHostPath, User) of
{ok, PurgedMessageCount} ->
@@ -1742,11 +1781,11 @@ handle_method(_MethodRecord, _Content, _State) ->
%% for why.
basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
ExclusiveConsume, Args, NoWait,
- State = #ch{conn_pid = ConnPid,
- limiter = Limiter,
+ State = #ch{cfg = #conf{conn_pid = ConnPid,
+ user = #user{username = Username}},
+ limiter = Limiter,
consumer_mapping = ConsumerMapping,
- user = #user{username = Username},
- queue_states = QueueStates0}) ->
+ queue_states = QueueStates0}) ->
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
@@ -1866,8 +1905,9 @@ handle_consuming_queue_down_or_eol(QRef,
%% not an HA failover. But the likelihood is not great and most users
%% are unlikely to care.
-cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities,
- consumer_mapping = CMap}) ->
+cancel_consumer(CTag, QName,
+ State = #ch{cfg = #conf{capabilities = Capabilities},
+ consumer_mapping = CMap}) ->
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
{bool, true} -> ok =
@@ -1940,7 +1980,8 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
basic_return(#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content},
- State = #ch{protocol = Protocol, writer_pid = WriterPid},
+ State = #ch{cfg = #conf{protocol = Protocol,
+ writer_pid = WriterPid}},
Reason) ->
?INCR_STATS(exchange_stats, ExchangeName, 1, return_unroutable, State),
{_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
@@ -1975,12 +2016,14 @@ internal_reject(Requeue, Acked, Limiter,
record_sent(Type, Tag, AckRequired,
Msg = {QName, QPid, MsgId, Redelivered, _Message},
- State = #ch{unacked_message_q = UAMQ,
- next_tag = DeliveryTag,
- trace_state = TraceState,
- user = #user{username = Username},
- conn_name = ConnName,
- channel = ChannelNum}) ->
+ State = #ch{cfg = #conf{channel = ChannelNum,
+ trace_state = TraceState,
+ user = #user{username = Username},
+ conn_name = ConnName
+ },
+ unacked_message_q = UAMQ,
+ next_tag = DeliveryTag
+ }) ->
?INCR_STATS(queue_stats, QName, 1, case {Type, AckRequired} of
{get, true} -> get;
{get, false} -> get_no_ack;
@@ -2063,9 +2106,10 @@ incr_queue_stats(QPid, QNames, MsgIds, State) ->
%% ack first" order.
new_tx() -> {?QUEUE:new(), []}.
-notify_queues(State = #ch{state = closing}) ->
+notify_queues(State = #ch{cfg = #conf{state = closing}}) ->
{ok, State};
notify_queues(State = #ch{consumer_mapping = Consumers,
+ cfg = Cfg,
delivering_queues = DQ }) ->
QRefs0 = sets:to_list(
sets:union(sets:from_list(consumer_queue_refs(Consumers)), DQ)),
@@ -2073,7 +2117,7 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
QPids = [P || P <- QRefs0, ?IS_CLASSIC(P)],
Timeout = get_operation_timeout(),
{rabbit_amqqueue:notify_down_all(QPids, self(), Timeout),
- State#ch{state = closing}}.
+ State#ch{cfg = Cfg#conf{state = closing}}}.
foreach_per_queue(_F, [], Acc) ->
Acc;
@@ -2223,7 +2267,7 @@ send_confirms_and_nacks(State) ->
send_nacks([], _, State) ->
State;
-send_nacks(_Rs, _, State = #ch{state = closing}) -> %% optimisation
+send_nacks(_Rs, _, State = #ch{cfg = #conf{state = closing}}) -> %% optimisation
State;
send_nacks(Rs, Cs, State) ->
coalesce_and_send(Rs, Cs,
@@ -2234,7 +2278,7 @@ send_nacks(Rs, Cs, State) ->
send_confirms([], _, State) ->
State;
-send_confirms(_Cs, _, State = #ch{state = closing}) -> %% optimisation
+send_confirms(_Cs, _, State = #ch{cfg = #conf{state = closing}}) -> %% optimisation
State;
send_confirms([MsgSeqNo], _, State) ->
ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State),
@@ -2299,14 +2343,14 @@ infos(Items, Deadline, State) ->
end || Item <- Items].
i(pid, _) -> self();
-i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
-i(number, #ch{channel = Channel}) -> Channel;
-i(user, #ch{user = User}) -> User#user.username;
+i(connection, #ch{cfg = #conf{conn_pid = ConnPid}}) -> ConnPid;
+i(number, #ch{cfg = #conf{channel = Channel}}) -> Channel;
+i(user, #ch{cfg = #conf{user = User}}) -> User#user.username;
i(user_who_performed_action, Ch) -> i(user, Ch);
-i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(vhost, #ch{cfg = #conf{virtual_host = VHost}}) -> VHost;
i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
-i(source, #ch{source = ChSrc}) -> ChSrc;
+i(source, #ch{cfg = #conf{source = ChSrc}}) -> ChSrc;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
@@ -2315,9 +2359,9 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
-i(state, #ch{state = running}) -> credit_flow:state();
-i(state, #ch{state = State}) -> State;
-i(prefetch_count, #ch{consumer_prefetch = C}) -> C;
+i(state, #ch{cfg = #conf{state = running}}) -> credit_flow:state();
+i(state, #ch{cfg = #conf{state = State}}) -> State;
+i(prefetch_count, #ch{cfg = #conf{consumer_prefetch = C}}) -> C;
i(global_prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_prefetch_limit(Limiter);
i(interceptors, #ch{interceptor_state = IState}) ->
@@ -2330,7 +2374,7 @@ i(reductions, _State) ->
i(Item, _) ->
throw({bad_argument, Item}).
-name(#ch{conn_name = ConnName, channel = Channel}) ->
+name(#ch{cfg = #conf{conn_name = ConnName, channel = Channel}}) ->
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
emit_stats(State) -> emit_stats(State, []).
@@ -2353,9 +2397,9 @@ erase_queue_stats(QName) ->
end || {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
QName0 =:= QName].
-get_vhost(#ch{virtual_host = VHost}) -> VHost.
+get_vhost(#ch{cfg = #conf{virtual_host = VHost}}) -> VHost.
-get_user(#ch{user = User}) -> User.
+get_user(#ch{cfg = #conf{user = User}}) -> User.
delete_stats({queue_stats, QName}) ->
rabbit_core_metrics:channel_queue_down({self(), QName});
@@ -2615,7 +2659,7 @@ handle_deliver(ConsumerTag, AckRequired,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}},
- State = #ch{writer_pid = WriterPid,
+ State = #ch{cfg = #conf{writer_pid = WriterPid},
next_tag = DeliveryTag}) ->
Deliver = #'basic.deliver'{consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag,