diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-04-05 17:43:32 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-04-23 14:36:10 +0100 |
| commit | d389d045df3f60bb19511d3240f3cee7c1de534c (patch) | |
| tree | 77370a074a006a4bbdbec92c8e93a9aadefa2b5d /src | |
| parent | 04e4d0b5ba486abb16e873b4d3df6e8792b009e6 (diff) | |
| download | rabbitmq-server-git-d389d045df3f60bb19511d3240f3cee7c1de534c.tar.gz | |
Refactor channel state
To put static fields into a nested record to avoid some copying on
update.
Also exit the channel when a basic.get times out.
[#164212469]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 608 |
1 files changed, 326 insertions, 282 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 32579050db..a4d3ec6321 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -81,92 +81,96 @@ %% Mgmt HTTP API refactor -export([handle_method/6]). --record(ch, { - %% starting | running | flow | closing - state, - %% same as reader's protocol. Used when instantiating - %% (protocol) exceptions. - protocol, - %% channel number - channel, - %% reader process - reader_pid, - %% writer process - writer_pid, - %% - conn_pid, - %% same as reader's name, see #v1.name - %% in rabbit_reader - conn_name, - %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined - %% or any other channel creating/spawning entity - source, - %% limiter pid, see rabbit_limiter - limiter, - %% none | {Msgs, Acks} | committing | failed | - tx, - %% (consumer) delivery tag sequence - next_tag, - %% messages pending consumer acknowledgement - unacked_message_q, - %% same as #v1.user in the reader, used in - %% authorisation checks - user, - %% same as #v1.user in the reader - virtual_host, - %% when queue.bind's queue field is empty, - %% this name will be used instead - most_recently_declared_queue, - %% a map of queue ref to queue name - queue_names, - %% queue processes are monitored to update - %% queue names - queue_monitors, - %% a map of consumer tags to - %% consumer details: #amqqueue record, acknowledgement mode, - %% consumer exclusivity, etc - consumer_mapping, - %% a map of queue pids to consumer tag lists - queue_consumers, - %% a set of pids of queues that have unacknowledged - %% deliveries - delivering_queues, - %% when a queue is declared as exclusive, queue - %% collector must be notified. - %% see rabbit_queue_collector for more info. - queue_collector_pid, - %% timer used to emit statistics - stats_timer, - %% are publisher confirms enabled for this channel? - confirm_enabled, - %% publisher confirm delivery tag sequence - publish_seqno, - %% a dtree used to track unconfirmed - %% (to publishers) messages - unconfirmed, - %% a list of tags for published messages that were - %% delivered but are yet to be confirmed to the client - confirmed, - %% a list of tags for published messages that were - %% rejected but are yet to be sent to the client - rejected, - %% same as capabilities in the reader - capabilities, - %% tracing exchange resource if tracing is enabled, - %% 'none' otherwise - trace_state, - consumer_prefetch, - %% used by "one shot RPC" (amq. - reply_consumer, - %% flow | noflow, see rabbitmq-server#114 - delivery_flow, - interceptor_state, - queue_states, - queue_cleanup_timer, - %% Message content size limit - max_message_size, - consumer_timeout -}). +-record(conf, { + %% starting | running | flow | closing + state, + %% same as reader's protocol. Used when instantiating + %% (protocol) exceptions. + protocol, + %% channel number + channel, + %% reader process + reader_pid, + %% writer process + writer_pid, + %% + conn_pid, + %% same as reader's name, see #v1.name + %% in rabbit_reader + conn_name, + %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined + %% or any other channel creating/spawning entity + source, + %% same as #v1.user in the reader, used in + %% authorisation checks + user, + %% same as #v1.user in the reader + virtual_host, + %% when queue.bind's queue field is empty, + %% this name will be used instead + most_recently_declared_queue, + %% when a queue is declared as exclusive, queue + %% collector must be notified. + %% see rabbit_queue_collector for more info. + queue_collector_pid, + + %% same as capabilities in the reader + capabilities, + %% tracing exchange resource if tracing is enabled, + %% 'none' otherwise + trace_state, + consumer_prefetch, + %% Message content size limit + max_message_size, + consumer_timeout + }). + +-record(ch, {cfg :: #conf{}, + %% limiter state, see rabbit_limiter + limiter, + %% none | {Msgs, Acks} | committing | failed | + tx, + %% (consumer) delivery tag sequence + next_tag, + %% messages pending consumer acknowledgement + unacked_message_q, + %% a map of queue ref to queue name + queue_names, + %% queue processes are monitored to update + %% queue names + queue_monitors, + %% a map of consumer tags to + %% consumer details: #amqqueue record, acknowledgement mode, + %% consumer exclusivity, etc + consumer_mapping, + %% a map of queue pids to consumer tag lists + queue_consumers, + %% a set of pids of queues that have unacknowledged + %% deliveries + delivering_queues, + %% timer used to emit statistics + stats_timer, + %% are publisher confirms enabled for this channel? + confirm_enabled, + %% publisher confirm delivery tag sequence + publish_seqno, + %% a dtree used to track unconfirmed + %% (to publishers) messages + unconfirmed, + %% a list of tags for published messages that were + %% delivered but are yet to be confirmed to the client + confirmed, + %% a list of tags for published messages that were + %% rejected but are yet to be sent to the client + rejected, + %% used by "one shot RPC" (amq. + reply_consumer, + %% flow | noflow, see rabbitmq-server#114 + delivery_flow, + interceptor_state, + queue_states, + queue_cleanup_timer + }). -define(QUEUE, lqueue). @@ -485,40 +489,43 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, end, MaxMessageSize = get_max_message_size(), ConsumerTimeout = get_consumer_timeout(), - State = #ch{state = starting, - protocol = Protocol, - channel = Channel, - reader_pid = ReaderPid, - writer_pid = WriterPid, - conn_pid = ConnPid, - conn_name = ConnName, - limiter = Limiter, + rabbit_log:info("consumer timeout ~w", [ConsumerTimeout]), + State = #ch{cfg = #conf{state = starting, + protocol = Protocol, + channel = Channel, + reader_pid = ReaderPid, + writer_pid = WriterPid, + conn_pid = ConnPid, + conn_name = ConnName, + user = User, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + queue_collector_pid = CollectorPid, + capabilities = Capabilities, + trace_state = rabbit_trace:init(VHost), + consumer_prefetch = Prefetch, + max_message_size = MaxMessageSize, + consumer_timeout = ConsumerTimeout + }, + limiter = Limiter, tx = none, next_tag = 1, unacked_message_q = ?QUEUE:new(), - user = User, - virtual_host = VHost, - most_recently_declared_queue = <<>>, queue_names = #{}, queue_monitors = pmon:new(), consumer_mapping = #{}, queue_consumers = #{}, delivering_queues = sets:new(), - queue_collector_pid = CollectorPid, confirm_enabled = false, publish_seqno = 1, unconfirmed = dtree:empty(), rejected = [], confirmed = [], - capabilities = Capabilities, - trace_state = rabbit_trace:init(VHost), - consumer_prefetch = Prefetch, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined, - queue_states = #{}, - max_message_size = MaxMessageSize, - consumer_timeout = ConsumerTimeout}, + queue_states = #{} + }, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer), @@ -571,8 +578,9 @@ handle_call({{info, Items}, Deadline}, _From, State) -> reply({error, Error}, State) end; -handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> - reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)}); +handle_call(refresh_config, _From, + State = #ch{cfg = #conf{virtual_host = VHost} = Cfg}) -> + reply(ok, State#ch{cfg = Cfg#conf{trace_state = rabbit_trace:init(VHost)}}); handle_call(refresh_interceptors, _From, State) -> IState = rabbit_channel_interceptor:init(State), @@ -593,7 +601,7 @@ handle_call(_Request, _From, State) -> noreply(State). handle_cast({method, Method, Content, Flow}, - State = #ch{reader_pid = Reader, + State = #ch{cfg = #conf{reader_pid = Reader}, interceptor_state = IState}) -> case Flow of %% We are going to process a message from the rabbit_reader @@ -620,12 +628,13 @@ handle_cast({method, Method, Content, Flow}, {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_cast(ready_for_close, State = #ch{state = closing, - writer_pid = WriterPid}) -> +handle_cast(ready_for_close, + State = #ch{cfg = #conf{state = closing, + writer_pid = WriterPid}}) -> ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), {stop, normal, State}; -handle_cast(terminate, State = #ch{writer_pid = WriterPid}) -> +handle_cast(terminate, State = #ch{cfg = #conf{writer_pid = WriterPid}}) -> ok = rabbit_writer:flush(WriterPid), {stop, normal, State}; @@ -637,12 +646,14 @@ handle_cast({command, Msg}, State) -> ok = send(Msg, State), noreply(State); -handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) -> +handle_cast({deliver, _CTag, _AckReq, _Msg}, + State = #ch{cfg = #conf{state = closing}}) -> noreply(State); handle_cast({deliver, ConsumerTag, AckRequired, Msg}, State) -> noreply(handle_deliver(ConsumerTag, AckRequired, Msg, State)); -handle_cast({deliver_reply, _K, _Del}, State = #ch{state = closing}) -> +handle_cast({deliver_reply, _K, _Del}, + State = #ch{cfg = #conf{state = closing}}) -> noreply(State); handle_cast({deliver_reply, _K, _Del}, State = #ch{reply_consumer = none}) -> noreply(State); @@ -650,8 +661,8 @@ handle_cast({deliver_reply, Key, #delivery{message = #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}}, - State = #ch{writer_pid = WriterPid, - next_tag = DeliveryTag, + State = #ch{cfg = #conf{writer_pid = WriterPid}, + next_tag = DeliveryTag, reply_consumer = {ConsumerTag, _Suffix, Key}}) -> ok = rabbit_writer:send_command( WriterPid, @@ -665,12 +676,14 @@ handle_cast({deliver_reply, Key, #delivery{message = handle_cast({deliver_reply, _K1, _}, State=#ch{reply_consumer = {_, _, _K2}}) -> noreply(State); -handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> +handle_cast({send_credit_reply, Len}, + State = #ch{cfg = #conf{writer_pid = WriterPid}}) -> ok = rabbit_writer:send_command( WriterPid, #'basic.credit_ok'{available = Len}), noreply(State); -handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> +handle_cast({send_drained, CTagCredit}, + State = #ch{cfg = #conf{writer_pid = WriterPid}}) -> [ok = rabbit_writer:send_command( WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag, credit_drained = CreditDrained}) @@ -737,7 +750,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, {internal, MsgSeqNos, Actions, QState1} -> State = State0#ch{queue_states = maps:put(Name, QState1, QueueStates)}, %% execute actions - WriterPid = State#ch.writer_pid, + WriterPid = State#ch.cfg#conf.writer_pid, lists:foreach(fun ({send_credit_reply, Avail}) -> ok = rabbit_writer:send_command( WriterPid, @@ -808,64 +821,79 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; -handle_info({{Ref, Node}, LateAnswer}, State = #ch{channel = Channel}) +handle_info({{Ref, Node}, LateAnswer}, + State = #ch{cfg = #conf{channel = Channel}}) when is_reference(Ref) -> rabbit_log_channel:warning("Channel ~p ignoring late answer ~p from ~p", [Channel, LateAnswer, Node]), noreply(State); -handle_info(queue_cleanup, State0 = #ch{channel = Channel, +handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel, + consumer_timeout = Timeout}, queue_states = QueueStates0, queue_names = QNames, queue_consumers = QCons, - consumer_timeout = Timeout, unacked_message_q = UAMQ}) -> QueueStates1 = maps:filter(fun(_, QS) -> QName = rabbit_quorum_queue:queue_name(QS), [] /= rabbit_amqqueue:lookup(QName) end, QueueStates0), + Now = os:system_time(millisecond), case ?QUEUE:peek(UAMQ) of {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}} - when is_integer(Timeout) andalso Time < Now - Timeout -> - rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " - "waiting on ack", [ConsumerTag, Channel]), - %% Cancel consumer both to the client and queue - %% and return all messages to the queue - QRef = qpid_to_ref(QPid), - QName = maps:get(QRef, QNames), - %% cancel the consumer with the client - State1 = cancel_consumer(ConsumerTag, QName, State0), - [Q] = rabbit_amqqueue:lookup([QName]), - %% send basic cancel to the queue - {ok, QueueStates2} = rabbit_amqqueue:basic_cancel( - Q, self(), ConsumerTag, undefined, - <<"broker">>, QueueStates1), - %% return all in-flight messages - {MsgIds, Rem} = lists:foldl( - fun({_DelTag, ConTag, _Time, {_, MsgId}}, - {Ids, Rem}) - when ConTag == ConsumerTag -> - {[MsgId | Ids], Rem}; - (Unacked, {Ids, Rem}) -> - {Ids, ?QUEUE:in(Unacked, Rem)} - end, {[], ?QUEUE:new()}, - ?QUEUE:to_list(UAMQ)), - QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, - self(), QueueStates2), - - State = State1#ch{queue_states = QueueStates, - queue_consumers = maps:remove(QRef, QCons), - unacked_message_q = Rem}, - noreply(init_queue_cleanup_timer(State)); + when is_integer(Timeout) + andalso Time < Now - Timeout -> + case ConsumerTag of + _ when is_integer(ConsumerTag) -> + %% basic.get - there is no mechanims so we just crash the + %% channel + Ex = rabbit_misc:amqp_error(precondition_failed, + "basic.get ack timed out on channel ~w", + [Channel], none), + handle_exception(Ex, State0); + % rabbit_misc:protocol_error(precondition_failed, + % "basic.get ack timed out on channel ~w ", + % [Channel]); + _ -> + rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " + "waiting on ack", + [rabbit_data_coercion:to_binary(ConsumerTag), + Channel]), + QRef = qpid_to_ref(QPid), + QName = maps:get(QRef, QNames), + %% cancel the consumer with the client + State1 = cancel_consumer(ConsumerTag, QName, State0), + [Q] = rabbit_amqqueue:lookup([QName]), + %% send basic cancel to the queue + {ok, QueueStates2} = rabbit_amqqueue:basic_cancel( + Q, self(), ConsumerTag, undefined, + <<"broker">>, QueueStates1), + %% return all in-flight messages for the consumer + {MsgIds, Rem} = lists:foldl( + fun({_DelTag, ConTag, _Time, {_, MsgId}}, + {Ids, Rem}) when ConTag == ConsumerTag -> + {[MsgId | Ids], Rem}; + (Unacked, {Ids, Rem}) -> + {Ids, ?QUEUE:in(Unacked, Rem)} + end, {[], ?QUEUE:new()}, + ?QUEUE:to_list(UAMQ)), + QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, + self(), QueueStates2), + + State = State1#ch{queue_states = QueueStates, + queue_consumers = maps:remove(QRef, QCons), + unacked_message_q = Rem}, + noreply(init_queue_cleanup_timer(State)) + end; _ -> noreply( init_queue_cleanup_timer( State0#ch{queue_states = QueueStates1})) end; -handle_info({channel_source, Source}, State = #ch{}) -> - noreply(State#ch{source = Source}). +handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) -> + noreply(State#ch{cfg = Cfg#conf{source = Source}}). handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -877,7 +905,8 @@ handle_pre_hibernate(State) -> end), {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}. -terminate(_Reason, State = #ch{user = #user{username = Username}}) -> +terminate(_Reason, + State = #ch{cfg = #conf{user = #user{username = Username}}}) -> {_Res, _State1} = notify_queues(State), pg_local:leave(rabbit_channels, self()), rabbit_event:if_enabled(State, #ch.stats_timer, @@ -930,22 +959,23 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -send(_Command, #ch{state = closing}) -> +send(_Command, #ch{cfg = #conf{state = closing}}) -> ok; -send(Command, #ch{writer_pid = WriterPid}) -> +send(Command, #ch{cfg = #conf{writer_pid = WriterPid}}) -> ok = rabbit_writer:send_command(WriterPid, Command). format_soft_error(#amqp_error{name = N, explanation = E, method = M}) -> io_lib:format("operation ~s caused a channel exception ~s: ~ts", [M, N, E]). -handle_exception(Reason, State = #ch{protocol = Protocol, - channel = Channel, - writer_pid = WriterPid, - reader_pid = ReaderPid, - conn_pid = ConnPid, - conn_name = ConnName, - virtual_host = VHost, - user = User}) -> +handle_exception(Reason, State = #ch{cfg = #conf{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid, + conn_pid = ConnPid, + conn_name = ConnName, + virtual_host = VHost, + user = User + }}) -> %% something bad's happened: notify_queues may not be 'ok' {_Result, State1} = notify_queues(State), case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of @@ -972,11 +1002,12 @@ precondition_failed(Format, Params) -> rabbit_misc:protocol_error(precondition_failed, Format, Params). return_queue_declare_ok(#resource{name = ActualName}, - NoWait, MessageCount, ConsumerCount, State) -> - return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait, - #'queue.declare_ok'{queue = ActualName, - message_count = MessageCount, - consumer_count = ConsumerCount}). + NoWait, MessageCount, ConsumerCount, + #ch{cfg = Cfg} = State) -> + return_ok(State#ch{cfg = Cfg#conf{most_recently_declared_queue = ActualName}}, + NoWait, #'queue.declare_ok'{queue = ActualName, + message_count = MessageCount, + consumer_count = ConsumerCount}). check_resource_access(User, Resource, Perm) -> V = {Resource, Perm}, @@ -1014,15 +1045,15 @@ check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> check_user_id_header(#'P_basic'{user_id = undefined}, _) -> ok; check_user_id_header(#'P_basic'{user_id = Username}, - #ch{user = #user{username = Username}}) -> + #ch{cfg = #conf{user = #user{username = Username}}}) -> ok; check_user_id_header( - #'P_basic'{}, #ch{user = #user{authz_backends = - [{rabbit_auth_backend_dummy, _}]}}) -> + #'P_basic'{}, #ch{cfg = #conf{user = #user{authz_backends = + [{rabbit_auth_backend_dummy, _}]}}}) -> ok; check_user_id_header(#'P_basic'{user_id = Claimed}, - #ch{user = #user{username = Actual, - tags = Tags}}) -> + #ch{cfg = #conf{user = #user{username = Actual, + tags = Tags}}}) -> case lists:member(impersonator, Tags) of true -> ok; false -> precondition_failed( @@ -1131,18 +1162,18 @@ qbin_to_resource(QueueNameBin, VHostPath) -> name_to_resource(Type, NameBin, VHostPath) -> rabbit_misc:r(VHostPath, Type, NameBin). -expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> +expand_queue_name_shortcut(<<>>, #ch{cfg = #conf{most_recently_declared_queue = <<>>}}) -> rabbit_misc:protocol_error(not_found, "no previously declared queue", []); -expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) -> +expand_queue_name_shortcut(<<>>, #ch{cfg = #conf{most_recently_declared_queue = MRDQ}}) -> MRDQ; expand_queue_name_shortcut(QueueNameBin, _) -> QueueNameBin. expand_routing_key_shortcut(<<>>, <<>>, - #ch{most_recently_declared_queue = <<>>}) -> + #ch{cfg = #conf{most_recently_declared_queue = <<>>}}) -> rabbit_misc:protocol_error(not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, - #ch{most_recently_declared_queue = MRDQ}) -> + #ch{cfg = #conf{most_recently_declared_queue = MRDQ}}) -> MRDQ; expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. @@ -1230,9 +1261,10 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> handle_method({Method, Content}, State) -> handle_method(Method, Content, State). -handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> +handle_method(#'channel.open'{}, _, + State = #ch{cfg = #conf{state = starting} = Cfg}) -> %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? - State1 = State#ch{state = running}, + State1 = State#ch{cfg = Cfg#conf{state = running}}, rabbit_event:if_enabled(State1, #ch.stats_timer, fun() -> emit_stats(State1) end), {reply, #'channel.open_ok'{}, State1}; @@ -1241,21 +1273,23 @@ handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( channel_error, "second 'channel.open' seen", []); -handle_method(_Method, _, #ch{state = starting}) -> +handle_method(_Method, _, #ch{cfg = #conf{state = starting}}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); -handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) -> +handle_method(#'channel.close_ok'{}, _, #ch{cfg = #conf{state = closing}}) -> stop; -handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid, - state = closing}) -> +handle_method(#'channel.close'{}, _, + State = #ch{cfg = #conf{state = closing, + writer_pid = WriterPid}}) -> ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), {noreply, State}; -handle_method(_Method, _, State = #ch{state = closing}) -> +handle_method(_Method, _, State = #ch{cfg = #conf{state = closing}}) -> {noreply, State}; -handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> +handle_method(#'channel.close'{}, _, + State = #ch{cfg = #conf{reader_pid = ReaderPid}}) -> {_Result, State1} = notify_queues(State), %% We issue the channel.close_ok response after a handshake with %% the reader, the other half of which is ready_for_close. That @@ -1287,17 +1321,19 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) -> handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, - Content, State = #ch{virtual_host = VHostPath, + Content, State = #ch{cfg = #conf{channel = ChannelNum, + conn_pid = ConnPid, + source = ChSrc, + conn_name = ConnName, + virtual_host = VHostPath, + user = #user{username = Username} = User, + trace_state = TraceState, + max_message_size = MaxMessageSize + }, tx = Tx, - channel = ChannelNum, confirm_enabled = ConfirmEnabled, - trace_state = TraceState, - user = #user{username = Username} = User, - conn_name = ConnName, - delivery_flow = Flow, - conn_pid = ConnPid, - source = ChSrc, - max_message_size = MaxMessageSize}) -> + delivery_flow = Flow + }) -> check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), @@ -1352,12 +1388,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, end}; handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{writer_pid = WriterPid, - conn_pid = ConnPid, - limiter = Limiter, + _, State = #ch{cfg = #conf{writer_pid = WriterPid, + conn_pid = ConnPid, + user = User, + virtual_host = VHostPath + }, + limiter = Limiter, next_tag = DeliveryTag, - user = User, - virtual_host = VHostPath, queue_states = QueueStates0}) -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), check_read_permitted(QueueName, User), @@ -1437,10 +1474,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait, arguments = Args}, - _, State = #ch{consumer_prefetch = ConsumerPrefetch, - consumer_mapping = ConsumerMapping, - user = User, - virtual_host = VHostPath}) -> + _, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch, + user = User, + virtual_host = VHostPath}, + consumer_mapping = ConsumerMapping + }) -> case maps:find(ConsumerTag, ConsumerMapping) of error -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), @@ -1472,9 +1510,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, end; handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{consumer_mapping = ConsumerMapping, + _, State = #ch{cfg = #conf{user = #user{username = Username}}, + consumer_mapping = ConsumerMapping, queue_consumers = QCons, - user = #user{username = Username}, queue_states = QueueStates0}) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case maps:find(ConsumerTag, ConsumerMapping) of @@ -1522,10 +1560,11 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> handle_method(#'basic.qos'{global = false, prefetch_count = PrefetchCount}, - _, State = #ch{limiter = Limiter}) -> + _, State = #ch{cfg = Cfg, + limiter = Limiter}) -> %% Ensures that if default was set, it's overridden Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), - {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount, + {reply, #'basic.qos_ok'{}, State#ch{cfg = Cfg#conf{consumer_prefetch = PrefetchCount}, limiter = Limiter1}}; handle_method(#'basic.qos'{global = true, @@ -1583,87 +1622,87 @@ handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, reject(DeliveryTag, Requeue, false, State); handle_method(#'exchange.declare'{nowait = NoWait} = Method, - _, State = #ch{virtual_host = VHostPath, - user = User, - queue_collector_pid = CollectorPid, - conn_pid = ConnPid, - source = ChSrc}) -> + _, State = #ch{cfg = #conf{virtual_host = VHostPath, + user = User, + queue_collector_pid = CollectorPid, + conn_pid = ConnPid, + source = ChSrc}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{nowait = NoWait} = Method, - _, State = #ch{conn_pid = ConnPid, - source = ChSrc, - virtual_host = VHostPath, - queue_collector_pid = CollectorPid, - user = User}) -> + _, State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + virtual_host = VHostPath, + queue_collector_pid = CollectorPid, + user = User}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.delete_ok'{}); handle_method(#'exchange.bind'{nowait = NoWait} = Method, - _, State = #ch{virtual_host = VHostPath, - conn_pid = ConnPid, - source = ChSrc, - queue_collector_pid = CollectorPid, - user = User}) -> + _, State = #ch{cfg = #conf{virtual_host = VHostPath, + conn_pid = ConnPid, + source = ChSrc, + queue_collector_pid = CollectorPid, + user = User}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.bind_ok'{}); handle_method(#'exchange.unbind'{nowait = NoWait} = Method, - _, State = #ch{virtual_host = VHostPath, - conn_pid = ConnPid, - source = ChSrc, - queue_collector_pid = CollectorPid, - user = User}) -> + _, State = #ch{cfg = #conf{virtual_host = VHostPath, + conn_pid = ConnPid, + source = ChSrc, + queue_collector_pid = CollectorPid, + user = User}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.unbind_ok'{}); handle_method(#'queue.declare'{nowait = NoWait} = Method, - _, State = #ch{virtual_host = VHostPath, - conn_pid = ConnPid, - source = ChSrc, - queue_collector_pid = CollectorPid, - user = User}) -> + _, State = #ch{cfg = #conf{virtual_host = VHostPath, + conn_pid = ConnPid, + source = ChSrc, + queue_collector_pid = CollectorPid, + user = User}}) -> {ok, QueueName, MessageCount, ConsumerCount} = handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); handle_method(#'queue.delete'{nowait = NoWait} = Method, _, - State = #ch{conn_pid = ConnPid, - source = ChSrc, - virtual_host = VHostPath, - queue_collector_pid = CollectorPid, - user = User}) -> + State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + virtual_host = VHostPath, + queue_collector_pid = CollectorPid, + user = User}}) -> {ok, PurgedMessageCount} = handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.delete_ok'{message_count = PurgedMessageCount}); handle_method(#'queue.bind'{nowait = NoWait} = Method, _, - State = #ch{conn_pid = ConnPid, - source = ChSrc, - user = User, - queue_collector_pid = CollectorPid, - virtual_host = VHostPath}) -> + State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.bind_ok'{}); handle_method(#'queue.unbind'{} = Method, _, - State = #ch{conn_pid = ConnPid, - source = ChSrc, - user = User, - queue_collector_pid = CollectorPid, - virtual_host = VHostPath}) -> + State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, false, #'queue.unbind_ok'{}); handle_method(#'queue.purge'{nowait = NoWait} = Method, - _, State = #ch{conn_pid = ConnPid, - source = ChSrc, - user = User, - queue_collector_pid = CollectorPid, - virtual_host = VHostPath}) -> + _, State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}}) -> case handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User) of {ok, PurgedMessageCount} -> @@ -1742,11 +1781,11 @@ handle_method(_MethodRecord, _Content, _State) -> %% for why. basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, ExclusiveConsume, Args, NoWait, - State = #ch{conn_pid = ConnPid, - limiter = Limiter, + State = #ch{cfg = #conf{conn_pid = ConnPid, + user = #user{username = Username}}, + limiter = Limiter, consumer_mapping = ConsumerMapping, - user = #user{username = Username}, - queue_states = QueueStates0}) -> + queue_states = QueueStates0}) -> case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> @@ -1866,8 +1905,9 @@ handle_consuming_queue_down_or_eol(QRef, %% not an HA failover. But the likelihood is not great and most users %% are unlikely to care. -cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities, - consumer_mapping = CMap}) -> +cancel_consumer(CTag, QName, + State = #ch{cfg = #conf{capabilities = Capabilities}, + consumer_mapping = CMap}) -> case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of {bool, true} -> ok = @@ -1940,7 +1980,8 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, basic_return(#basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}, - State = #ch{protocol = Protocol, writer_pid = WriterPid}, + State = #ch{cfg = #conf{protocol = Protocol, + writer_pid = WriterPid}}, Reason) -> ?INCR_STATS(exchange_stats, ExchangeName, 1, return_unroutable, State), {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), @@ -1975,12 +2016,14 @@ internal_reject(Requeue, Acked, Limiter, record_sent(Type, Tag, AckRequired, Msg = {QName, QPid, MsgId, Redelivered, _Message}, - State = #ch{unacked_message_q = UAMQ, - next_tag = DeliveryTag, - trace_state = TraceState, - user = #user{username = Username}, - conn_name = ConnName, - channel = ChannelNum}) -> + State = #ch{cfg = #conf{channel = ChannelNum, + trace_state = TraceState, + user = #user{username = Username}, + conn_name = ConnName + }, + unacked_message_q = UAMQ, + next_tag = DeliveryTag + }) -> ?INCR_STATS(queue_stats, QName, 1, case {Type, AckRequired} of {get, true} -> get; {get, false} -> get_no_ack; @@ -2063,9 +2106,10 @@ incr_queue_stats(QPid, QNames, MsgIds, State) -> %% ack first" order. new_tx() -> {?QUEUE:new(), []}. -notify_queues(State = #ch{state = closing}) -> +notify_queues(State = #ch{cfg = #conf{state = closing}}) -> {ok, State}; notify_queues(State = #ch{consumer_mapping = Consumers, + cfg = Cfg, delivering_queues = DQ }) -> QRefs0 = sets:to_list( sets:union(sets:from_list(consumer_queue_refs(Consumers)), DQ)), @@ -2073,7 +2117,7 @@ notify_queues(State = #ch{consumer_mapping = Consumers, QPids = [P || P <- QRefs0, ?IS_CLASSIC(P)], Timeout = get_operation_timeout(), {rabbit_amqqueue:notify_down_all(QPids, self(), Timeout), - State#ch{state = closing}}. + State#ch{cfg = Cfg#conf{state = closing}}}. foreach_per_queue(_F, [], Acc) -> Acc; @@ -2223,7 +2267,7 @@ send_confirms_and_nacks(State) -> send_nacks([], _, State) -> State; -send_nacks(_Rs, _, State = #ch{state = closing}) -> %% optimisation +send_nacks(_Rs, _, State = #ch{cfg = #conf{state = closing}}) -> %% optimisation State; send_nacks(Rs, Cs, State) -> coalesce_and_send(Rs, Cs, @@ -2234,7 +2278,7 @@ send_nacks(Rs, Cs, State) -> send_confirms([], _, State) -> State; -send_confirms(_Cs, _, State = #ch{state = closing}) -> %% optimisation +send_confirms(_Cs, _, State = #ch{cfg = #conf{state = closing}}) -> %% optimisation State; send_confirms([MsgSeqNo], _, State) -> ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State), @@ -2299,14 +2343,14 @@ infos(Items, Deadline, State) -> end || Item <- Items]. i(pid, _) -> self(); -i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; -i(number, #ch{channel = Channel}) -> Channel; -i(user, #ch{user = User}) -> User#user.username; +i(connection, #ch{cfg = #conf{conn_pid = ConnPid}}) -> ConnPid; +i(number, #ch{cfg = #conf{channel = Channel}}) -> Channel; +i(user, #ch{cfg = #conf{user = User}}) -> User#user.username; i(user_who_performed_action, Ch) -> i(user, Ch); -i(vhost, #ch{virtual_host = VHost}) -> VHost; +i(vhost, #ch{cfg = #conf{virtual_host = VHost}}) -> VHost; i(transactional, #ch{tx = Tx}) -> Tx =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; -i(source, #ch{source = ChSrc}) -> ChSrc; +i(source, #ch{cfg = #conf{source = ChSrc}}) -> ChSrc; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); @@ -2315,9 +2359,9 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; -i(state, #ch{state = running}) -> credit_flow:state(); -i(state, #ch{state = State}) -> State; -i(prefetch_count, #ch{consumer_prefetch = C}) -> C; +i(state, #ch{cfg = #conf{state = running}}) -> credit_flow:state(); +i(state, #ch{cfg = #conf{state = State}}) -> State; +i(prefetch_count, #ch{cfg = #conf{consumer_prefetch = C}}) -> C; i(global_prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); i(interceptors, #ch{interceptor_state = IState}) -> @@ -2330,7 +2374,7 @@ i(reductions, _State) -> i(Item, _) -> throw({bad_argument, Item}). -name(#ch{conn_name = ConnName, channel = Channel}) -> +name(#ch{cfg = #conf{conn_name = ConnName, channel = Channel}}) -> list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). emit_stats(State) -> emit_stats(State, []). @@ -2353,9 +2397,9 @@ erase_queue_stats(QName) -> end || {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), QName0 =:= QName]. -get_vhost(#ch{virtual_host = VHost}) -> VHost. +get_vhost(#ch{cfg = #conf{virtual_host = VHost}}) -> VHost. -get_user(#ch{user = User}) -> User. +get_user(#ch{cfg = #conf{user = User}}) -> User. delete_stats({queue_stats, QName}) -> rabbit_core_metrics:channel_queue_down({self(), QName}); @@ -2615,7 +2659,7 @@ handle_deliver(ConsumerTag, AckRequired, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}, - State = #ch{writer_pid = WriterPid, + State = #ch{cfg = #conf{writer_pid = WriterPid}, next_tag = DeliveryTag}) -> Deliver = #'basic.deliver'{consumer_tag = ConsumerTag, delivery_tag = DeliveryTag, |
