diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2019-04-24 23:51:40 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2019-04-24 23:51:40 +0300 |
| commit | c9edbec027222ff276933bc9168b7f1f159c2a00 (patch) | |
| tree | 6f73b03d5d7b08fdeb4e793d110b805644de39ca /src | |
| parent | 388bd440babf102d8d7d0289e23bdd5e7df8604b (diff) | |
| parent | 20221724ac23389221fb7f9534ffc9978c282bba (diff) | |
| download | rabbitmq-server-git-c9edbec027222ff276933bc9168b7f1f159c2a00.tar.gz | |
Merge branch 'master' into quorum-ets-memory-breakdown
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 661 |
2 files changed, 396 insertions, 269 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fac9d5e50f..8855cfede4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -74,6 +74,7 @@ -export_type([name/0, qmsg/0, absent_reason/0]). -type name() :: rabbit_types:r('queue'). + -type qpids() :: [pid()]. -type qlen() :: rabbit_types:ok(non_neg_integer()). -type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()). @@ -1229,12 +1230,11 @@ purge(Q) when ?amqqueue_is_quorum(Q) -> NodeId = amqqueue:get_pid(Q), rabbit_quorum_queue:purge(NodeId). --spec requeue(pid(), +-spec requeue(pid() | amqqueue:ra_server_id(), {rabbit_fifo:consumer_tag(), [msg_id()]}, pid(), quorum_states()) -> 'ok'. - requeue(QPid, {_, MsgIds}, ChPid, QuorumStates) when ?IS_CLASSIC(QPid) -> ok = delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}), QuorumStates; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 59a8cce371..ff6186ad84 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -66,8 +66,9 @@ -export([source/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1, prioritise_call/4, - prioritise_cast/3, prioritise_info/3, format_message_queue/2]). + handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1, + prioritise_call/4, prioritise_cast/3, prioritise_info/3, + format_message_queue/2]). -deprecated([{force_event_refresh, 1, eventually}]). @@ -81,91 +82,96 @@ %% Mgmt HTTP API refactor -export([handle_method/6]). --record(ch, { - %% starting | running | flow | closing - state, - %% same as reader's protocol. Used when instantiating - %% (protocol) exceptions. - protocol, - %% channel number - channel, - %% reader process - reader_pid, - %% writer process - writer_pid, - %% - conn_pid, - %% same as reader's name, see #v1.name - %% in rabbit_reader - conn_name, - %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined - %% or any other channel creating/spawning entity - source, - %% limiter pid, see rabbit_limiter - limiter, - %% none | {Msgs, Acks} | committing | failed | - tx, - %% (consumer) delivery tag sequence - next_tag, - %% messages pending consumer acknowledgement - unacked_message_q, - %% same as #v1.user in the reader, used in - %% authorisation checks - user, - %% same as #v1.user in the reader - virtual_host, - %% when queue.bind's queue field is empty, - %% this name will be used instead - most_recently_declared_queue, - %% a map of queue ref to queue name - queue_names, - %% queue processes are monitored to update - %% queue names - queue_monitors, - %% a map of consumer tags to - %% consumer details: #amqqueue record, acknowledgement mode, - %% consumer exclusivity, etc - consumer_mapping, - %% a map of queue pids to consumer tag lists - queue_consumers, - %% a set of pids of queues that have unacknowledged - %% deliveries - delivering_queues, - %% when a queue is declared as exclusive, queue - %% collector must be notified. - %% see rabbit_queue_collector for more info. - queue_collector_pid, - %% timer used to emit statistics - stats_timer, - %% are publisher confirms enabled for this channel? - confirm_enabled, - %% publisher confirm delivery tag sequence - publish_seqno, - %% a dtree used to track unconfirmed - %% (to publishers) messages - unconfirmed, - %% a list of tags for published messages that were - %% delivered but are yet to be confirmed to the client - confirmed, - %% a list of tags for published messages that were - %% rejected but are yet to be sent to the client - rejected, - %% same as capabilities in the reader - capabilities, - %% tracing exchange resource if tracing is enabled, - %% 'none' otherwise - trace_state, - consumer_prefetch, - %% used by "one shot RPC" (amq. - reply_consumer, - %% flow | noflow, see rabbitmq-server#114 - delivery_flow, - interceptor_state, - queue_states, - queue_cleanup_timer, - %% Message content size limit - max_message_size -}). +-record(conf, { + %% starting | running | flow | closing + state, + %% same as reader's protocol. Used when instantiating + %% (protocol) exceptions. + protocol, + %% channel number + channel, + %% reader process + reader_pid, + %% writer process + writer_pid, + %% + conn_pid, + %% same as reader's name, see #v1.name + %% in rabbit_reader + conn_name, + %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined + %% or any other channel creating/spawning entity + source, + %% same as #v1.user in the reader, used in + %% authorisation checks + user, + %% same as #v1.user in the reader + virtual_host, + %% when queue.bind's queue field is empty, + %% this name will be used instead + most_recently_declared_queue, + %% when a queue is declared as exclusive, queue + %% collector must be notified. + %% see rabbit_queue_collector for more info. + queue_collector_pid, + + %% same as capabilities in the reader + capabilities, + %% tracing exchange resource if tracing is enabled, + %% 'none' otherwise + trace_state, + consumer_prefetch, + %% Message content size limit + max_message_size, + consumer_timeout + }). + +-record(ch, {cfg :: #conf{}, + %% limiter state, see rabbit_limiter + limiter, + %% none | {Msgs, Acks} | committing | failed | + tx, + %% (consumer) delivery tag sequence + next_tag, + %% messages pending consumer acknowledgement + unacked_message_q, + %% a map of queue ref to queue name + queue_names, + %% queue processes are monitored to update + %% queue names + queue_monitors, + %% a map of consumer tags to + %% consumer details: #amqqueue record, acknowledgement mode, + %% consumer exclusivity, etc + consumer_mapping, + %% a map of queue pids to consumer tag lists + queue_consumers, + %% a set of pids of queues that have unacknowledged + %% deliveries + delivering_queues, + %% timer used to emit statistics + stats_timer, + %% are publisher confirms enabled for this channel? + confirm_enabled, + %% publisher confirm delivery tag sequence + publish_seqno, + %% a dtree used to track unconfirmed + %% (to publishers) messages + unconfirmed, + %% a list of tags for published messages that were + %% delivered but are yet to be confirmed to the client + confirmed, + %% a list of tags for published messages that were + %% rejected but are yet to be sent to the client + rejected, + %% used by "one shot RPC" (amq. + reply_consumer, + %% flow | noflow, see rabbitmq-server#114 + delivery_flow, + interceptor_state, + queue_states, + tick_timer + }). -define(QUEUE, lqueue). @@ -483,39 +489,43 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Limiter0 end, MaxMessageSize = get_max_message_size(), - State = #ch{state = starting, - protocol = Protocol, - channel = Channel, - reader_pid = ReaderPid, - writer_pid = WriterPid, - conn_pid = ConnPid, - conn_name = ConnName, - limiter = Limiter, + ConsumerTimeout = get_consumer_timeout(), + State = #ch{cfg = #conf{state = starting, + protocol = Protocol, + channel = Channel, + reader_pid = ReaderPid, + writer_pid = WriterPid, + conn_pid = ConnPid, + conn_name = ConnName, + user = User, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + queue_collector_pid = CollectorPid, + capabilities = Capabilities, + trace_state = rabbit_trace:init(VHost), + consumer_prefetch = Prefetch, + max_message_size = MaxMessageSize, + consumer_timeout = ConsumerTimeout + }, + limiter = Limiter, tx = none, next_tag = 1, unacked_message_q = ?QUEUE:new(), - user = User, - virtual_host = VHost, - most_recently_declared_queue = <<>>, queue_names = #{}, queue_monitors = pmon:new(), consumer_mapping = #{}, queue_consumers = #{}, delivering_queues = sets:new(), - queue_collector_pid = CollectorPid, confirm_enabled = false, publish_seqno = 1, unconfirmed = dtree:empty(), rejected = [], confirmed = [], - capabilities = Capabilities, - trace_state = rabbit_trace:init(VHost), - consumer_prefetch = Prefetch, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined, - queue_states = #{}, - max_message_size = MaxMessageSize}, + queue_states = #{} + }, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer), @@ -525,7 +535,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, rabbit_event:if_enabled(State2, #ch.stats_timer, fun() -> emit_stats(State2) end), put_operation_timeout(), - State3 = init_queue_cleanup_timer(State2), + State3 = init_tick_timer(State2), {ok, State3, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -568,8 +578,9 @@ handle_call({{info, Items}, Deadline}, _From, State) -> reply({error, Error}, State) end; -handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> - reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)}); +handle_call(refresh_config, _From, + State = #ch{cfg = #conf{virtual_host = VHost} = Cfg}) -> + reply(ok, State#ch{cfg = Cfg#conf{trace_state = rabbit_trace:init(VHost)}}); handle_call(refresh_interceptors, _From, State) -> IState = rabbit_channel_interceptor:init(State), @@ -590,7 +601,7 @@ handle_call(_Request, _From, State) -> noreply(State). handle_cast({method, Method, Content, Flow}, - State = #ch{reader_pid = Reader, + State = #ch{cfg = #conf{reader_pid = Reader}, interceptor_state = IState}) -> case Flow of %% We are going to process a message from the rabbit_reader @@ -617,12 +628,13 @@ handle_cast({method, Method, Content, Flow}, {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_cast(ready_for_close, State = #ch{state = closing, - writer_pid = WriterPid}) -> +handle_cast(ready_for_close, + State = #ch{cfg = #conf{state = closing, + writer_pid = WriterPid}}) -> ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), {stop, normal, State}; -handle_cast(terminate, State = #ch{writer_pid = WriterPid}) -> +handle_cast(terminate, State = #ch{cfg = #conf{writer_pid = WriterPid}}) -> ok = rabbit_writer:flush(WriterPid), {stop, normal, State}; @@ -634,12 +646,14 @@ handle_cast({command, Msg}, State) -> ok = send(Msg, State), noreply(State); -handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) -> +handle_cast({deliver, _CTag, _AckReq, _Msg}, + State = #ch{cfg = #conf{state = closing}}) -> noreply(State); handle_cast({deliver, ConsumerTag, AckRequired, Msg}, State) -> noreply(handle_deliver(ConsumerTag, AckRequired, Msg, State)); -handle_cast({deliver_reply, _K, _Del}, State = #ch{state = closing}) -> +handle_cast({deliver_reply, _K, _Del}, + State = #ch{cfg = #conf{state = closing}}) -> noreply(State); handle_cast({deliver_reply, _K, _Del}, State = #ch{reply_consumer = none}) -> noreply(State); @@ -647,8 +661,8 @@ handle_cast({deliver_reply, Key, #delivery{message = #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}}, - State = #ch{writer_pid = WriterPid, - next_tag = DeliveryTag, + State = #ch{cfg = #conf{writer_pid = WriterPid}, + next_tag = DeliveryTag, reply_consumer = {ConsumerTag, _Suffix, Key}}) -> ok = rabbit_writer:send_command( WriterPid, @@ -662,12 +676,14 @@ handle_cast({deliver_reply, Key, #delivery{message = handle_cast({deliver_reply, _K1, _}, State=#ch{reply_consumer = {_, _, _K2}}) -> noreply(State); -handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> +handle_cast({send_credit_reply, Len}, + State = #ch{cfg = #conf{writer_pid = WriterPid}}) -> ok = rabbit_writer:send_command( WriterPid, #'basic.credit_ok'{available = Len}), noreply(State); -handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> +handle_cast({send_drained, CTagCredit}, + State = #ch{cfg = #conf{writer_pid = WriterPid}}) -> [ok = rabbit_writer:send_command( WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag, credit_drained = CreditDrained}) @@ -734,7 +750,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, {internal, MsgSeqNos, Actions, QState1} -> State = State0#ch{queue_states = maps:put(Name, QState1, QueueStates)}, %% execute actions - WriterPid = State#ch.writer_pid, + WriterPid = State#ch.cfg#conf.writer_pid, lists:foreach(fun ({send_credit_reply, Avail}) -> ok = rabbit_writer:send_command( WriterPid, @@ -805,34 +821,45 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; -handle_info({{Ref, Node}, LateAnswer}, State = #ch{channel = Channel}) +handle_info({{Ref, Node}, LateAnswer}, + State = #ch{cfg = #conf{channel = Channel}}) when is_reference(Ref) -> rabbit_log_channel:warning("Channel ~p ignoring late answer ~p from ~p", [Channel, LateAnswer, Node]), noreply(State); -handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) -> - QueueStates = +handle_info(tick, State0 = #ch{queue_states = QueueStates0}) -> + QueueStates1 = maps:filter(fun(_, QS) -> - QName = rabbit_quorum_queue:queue_name(QS), - [] /= rabbit_amqqueue:lookup(QName) + QName = rabbit_quorum_queue:queue_name(QS), + [] /= rabbit_amqqueue:lookup([QName]) end, QueueStates0), - noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})); - -handle_info({channel_source, Source}, State = #ch{}) -> - noreply(State#ch{source = Source}). + case evaluate_consumer_timeout(State0#ch{queue_states = QueueStates1}) of + {noreply, State} -> + noreply(init_tick_timer(reset_tick_timer(State))); + Return -> + Return + end; +handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) -> + noreply(State#ch{cfg = Cfg#conf{source = Source}}). -handle_pre_hibernate(State) -> +handle_pre_hibernate(State0) -> ok = clear_permission_cache(), + State = maybe_cancel_tick_timer(State0), rabbit_event:if_enabled( State, #ch.stats_timer, fun () -> emit_stats(State, [{idle_since, os:system_time(milli_seconds)}]) - end), + end), {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}. -terminate(_Reason, State = #ch{user = #user{username = Username}}) -> +handle_post_hibernate(State0) -> + State = init_tick_timer(State0), + {noreply, State}. + +terminate(_Reason, + State = #ch{cfg = #conf{user = #user{username = Username}}}) -> {_Res, _State1} = notify_queues(State), pg_local:leave(rabbit_channels, self()), rabbit_event:if_enabled(State, #ch.stats_timer, @@ -857,6 +884,13 @@ get_max_message_size() -> ?MAX_MSG_SIZE end. +get_consumer_timeout() -> + case application:get_env(rabbit, consumer_timeout) of + {ok, MS} when is_integer(MS) -> + MS; + _ -> + undefined + end. %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. @@ -878,22 +912,23 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -send(_Command, #ch{state = closing}) -> +send(_Command, #ch{cfg = #conf{state = closing}}) -> ok; -send(Command, #ch{writer_pid = WriterPid}) -> +send(Command, #ch{cfg = #conf{writer_pid = WriterPid}}) -> ok = rabbit_writer:send_command(WriterPid, Command). format_soft_error(#amqp_error{name = N, explanation = E, method = M}) -> io_lib:format("operation ~s caused a channel exception ~s: ~ts", [M, N, E]). -handle_exception(Reason, State = #ch{protocol = Protocol, - channel = Channel, - writer_pid = WriterPid, - reader_pid = ReaderPid, - conn_pid = ConnPid, - conn_name = ConnName, - virtual_host = VHost, - user = User}) -> +handle_exception(Reason, State = #ch{cfg = #conf{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid, + conn_pid = ConnPid, + conn_name = ConnName, + virtual_host = VHost, + user = User + }}) -> %% something bad's happened: notify_queues may not be 'ok' {_Result, State1} = notify_queues(State), case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of @@ -920,11 +955,12 @@ precondition_failed(Format, Params) -> rabbit_misc:protocol_error(precondition_failed, Format, Params). return_queue_declare_ok(#resource{name = ActualName}, - NoWait, MessageCount, ConsumerCount, State) -> - return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait, - #'queue.declare_ok'{queue = ActualName, - message_count = MessageCount, - consumer_count = ConsumerCount}). + NoWait, MessageCount, ConsumerCount, + #ch{cfg = Cfg} = State) -> + return_ok(State#ch{cfg = Cfg#conf{most_recently_declared_queue = ActualName}}, + NoWait, #'queue.declare_ok'{queue = ActualName, + message_count = MessageCount, + consumer_count = ConsumerCount}). check_resource_access(User, Resource, Perm) -> V = {Resource, Perm}, @@ -962,15 +998,15 @@ check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> check_user_id_header(#'P_basic'{user_id = undefined}, _) -> ok; check_user_id_header(#'P_basic'{user_id = Username}, - #ch{user = #user{username = Username}}) -> + #ch{cfg = #conf{user = #user{username = Username}}}) -> ok; check_user_id_header( - #'P_basic'{}, #ch{user = #user{authz_backends = - [{rabbit_auth_backend_dummy, _}]}}) -> + #'P_basic'{}, #ch{cfg = #conf{user = #user{authz_backends = + [{rabbit_auth_backend_dummy, _}]}}}) -> ok; check_user_id_header(#'P_basic'{user_id = Claimed}, - #ch{user = #user{username = Actual, - tags = Tags}}) -> + #ch{cfg = #conf{user = #user{username = Actual, + tags = Tags}}}) -> case lists:member(impersonator, Tags) of true -> ok; false -> precondition_failed( @@ -1079,18 +1115,18 @@ qbin_to_resource(QueueNameBin, VHostPath) -> name_to_resource(Type, NameBin, VHostPath) -> rabbit_misc:r(VHostPath, Type, NameBin). -expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> +expand_queue_name_shortcut(<<>>, #ch{cfg = #conf{most_recently_declared_queue = <<>>}}) -> rabbit_misc:protocol_error(not_found, "no previously declared queue", []); -expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) -> +expand_queue_name_shortcut(<<>>, #ch{cfg = #conf{most_recently_declared_queue = MRDQ}}) -> MRDQ; expand_queue_name_shortcut(QueueNameBin, _) -> QueueNameBin. expand_routing_key_shortcut(<<>>, <<>>, - #ch{most_recently_declared_queue = <<>>}) -> + #ch{cfg = #conf{most_recently_declared_queue = <<>>}}) -> rabbit_misc:protocol_error(not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, - #ch{most_recently_declared_queue = MRDQ}) -> + #ch{cfg = #conf{most_recently_declared_queue = MRDQ}}) -> MRDQ; expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. @@ -1178,9 +1214,10 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> handle_method({Method, Content}, State) -> handle_method(Method, Content, State). -handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> +handle_method(#'channel.open'{}, _, + State = #ch{cfg = #conf{state = starting} = Cfg}) -> %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? - State1 = State#ch{state = running}, + State1 = State#ch{cfg = Cfg#conf{state = running}}, rabbit_event:if_enabled(State1, #ch.stats_timer, fun() -> emit_stats(State1) end), {reply, #'channel.open_ok'{}, State1}; @@ -1189,21 +1226,23 @@ handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( channel_error, "second 'channel.open' seen", []); -handle_method(_Method, _, #ch{state = starting}) -> +handle_method(_Method, _, #ch{cfg = #conf{state = starting}}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); -handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) -> +handle_method(#'channel.close_ok'{}, _, #ch{cfg = #conf{state = closing}}) -> stop; -handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid, - state = closing}) -> +handle_method(#'channel.close'{}, _, + State = #ch{cfg = #conf{state = closing, + writer_pid = WriterPid}}) -> ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), {noreply, State}; -handle_method(_Method, _, State = #ch{state = closing}) -> +handle_method(_Method, _, State = #ch{cfg = #conf{state = closing}}) -> {noreply, State}; -handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> +handle_method(#'channel.close'{}, _, + State = #ch{cfg = #conf{reader_pid = ReaderPid}}) -> {_Result, State1} = notify_queues(State), %% We issue the channel.close_ok response after a handshake with %% the reader, the other half of which is ready_for_close. That @@ -1235,17 +1274,19 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) -> handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, - Content, State = #ch{virtual_host = VHostPath, + Content, State = #ch{cfg = #conf{channel = ChannelNum, + conn_pid = ConnPid, + source = ChSrc, + conn_name = ConnName, + virtual_host = VHostPath, + user = #user{username = Username} = User, + trace_state = TraceState, + max_message_size = MaxMessageSize + }, tx = Tx, - channel = ChannelNum, confirm_enabled = ConfirmEnabled, - trace_state = TraceState, - user = #user{username = Username} = User, - conn_name = ConnName, - delivery_flow = Flow, - conn_pid = ConnPid, - source = ChSrc, - max_message_size = MaxMessageSize}) -> + delivery_flow = Flow + }) -> check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), @@ -1300,12 +1341,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, end}; handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{writer_pid = WriterPid, - conn_pid = ConnPid, - limiter = Limiter, + _, State = #ch{cfg = #conf{writer_pid = WriterPid, + conn_pid = ConnPid, + user = User, + virtual_host = VHostPath + }, + limiter = Limiter, next_tag = DeliveryTag, - user = User, - virtual_host = VHostPath, queue_states = QueueStates0}) -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), check_read_permitted(QueueName, User), @@ -1385,10 +1427,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait, arguments = Args}, - _, State = #ch{consumer_prefetch = ConsumerPrefetch, - consumer_mapping = ConsumerMapping, - user = User, - virtual_host = VHostPath}) -> + _, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch, + user = User, + virtual_host = VHostPath}, + consumer_mapping = ConsumerMapping + }) -> case maps:find(ConsumerTag, ConsumerMapping) of error -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), @@ -1420,9 +1463,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, end; handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{consumer_mapping = ConsumerMapping, + _, State = #ch{cfg = #conf{user = #user{username = Username}}, + consumer_mapping = ConsumerMapping, queue_consumers = QCons, - user = #user{username = Username}, queue_states = QueueStates0}) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case maps:find(ConsumerTag, ConsumerMapping) of @@ -1470,10 +1513,11 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> handle_method(#'basic.qos'{global = false, prefetch_count = PrefetchCount}, - _, State = #ch{limiter = Limiter}) -> + _, State = #ch{cfg = Cfg, + limiter = Limiter}) -> %% Ensures that if default was set, it's overridden Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), - {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount, + {reply, #'basic.qos_ok'{}, State#ch{cfg = Cfg#conf{consumer_prefetch = PrefetchCount}, limiter = Limiter1}}; handle_method(#'basic.qos'{global = true, @@ -1531,87 +1575,87 @@ handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, reject(DeliveryTag, Requeue, false, State); handle_method(#'exchange.declare'{nowait = NoWait} = Method, - _, State = #ch{virtual_host = VHostPath, - user = User, - queue_collector_pid = CollectorPid, - conn_pid = ConnPid, - source = ChSrc}) -> + _, State = #ch{cfg = #conf{virtual_host = VHostPath, + user = User, + queue_collector_pid = CollectorPid, + conn_pid = ConnPid, + source = ChSrc}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{nowait = NoWait} = Method, - _, State = #ch{conn_pid = ConnPid, - source = ChSrc, - virtual_host = VHostPath, - queue_collector_pid = CollectorPid, - user = User}) -> + _, State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + virtual_host = VHostPath, + queue_collector_pid = CollectorPid, + user = User}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.delete_ok'{}); handle_method(#'exchange.bind'{nowait = NoWait} = Method, - _, State = #ch{virtual_host = VHostPath, - conn_pid = ConnPid, - source = ChSrc, - queue_collector_pid = CollectorPid, - user = User}) -> + _, State = #ch{cfg = #conf{virtual_host = VHostPath, + conn_pid = ConnPid, + source = ChSrc, + queue_collector_pid = CollectorPid, + user = User}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.bind_ok'{}); handle_method(#'exchange.unbind'{nowait = NoWait} = Method, - _, State = #ch{virtual_host = VHostPath, - conn_pid = ConnPid, - source = ChSrc, - queue_collector_pid = CollectorPid, - user = User}) -> + _, State = #ch{cfg = #conf{virtual_host = VHostPath, + conn_pid = ConnPid, + source = ChSrc, + queue_collector_pid = CollectorPid, + user = User}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.unbind_ok'{}); handle_method(#'queue.declare'{nowait = NoWait} = Method, - _, State = #ch{virtual_host = VHostPath, - conn_pid = ConnPid, - source = ChSrc, - queue_collector_pid = CollectorPid, - user = User}) -> + _, State = #ch{cfg = #conf{virtual_host = VHostPath, + conn_pid = ConnPid, + source = ChSrc, + queue_collector_pid = CollectorPid, + user = User}}) -> {ok, QueueName, MessageCount, ConsumerCount} = handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); handle_method(#'queue.delete'{nowait = NoWait} = Method, _, - State = #ch{conn_pid = ConnPid, - source = ChSrc, - virtual_host = VHostPath, - queue_collector_pid = CollectorPid, - user = User}) -> + State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + virtual_host = VHostPath, + queue_collector_pid = CollectorPid, + user = User}}) -> {ok, PurgedMessageCount} = handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.delete_ok'{message_count = PurgedMessageCount}); handle_method(#'queue.bind'{nowait = NoWait} = Method, _, - State = #ch{conn_pid = ConnPid, - source = ChSrc, - user = User, - queue_collector_pid = CollectorPid, - virtual_host = VHostPath}) -> + State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.bind_ok'{}); handle_method(#'queue.unbind'{} = Method, _, - State = #ch{conn_pid = ConnPid, - source = ChSrc, - user = User, - queue_collector_pid = CollectorPid, - virtual_host = VHostPath}) -> + State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, false, #'queue.unbind_ok'{}); handle_method(#'queue.purge'{nowait = NoWait} = Method, - _, State = #ch{conn_pid = ConnPid, - source = ChSrc, - user = User, - queue_collector_pid = CollectorPid, - virtual_host = VHostPath}) -> + _, State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}}) -> case handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User) of {ok, PurgedMessageCount} -> @@ -1690,11 +1734,11 @@ handle_method(_MethodRecord, _Content, _State) -> %% for why. basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, ExclusiveConsume, Args, NoWait, - State = #ch{conn_pid = ConnPid, - limiter = Limiter, + State = #ch{cfg = #conf{conn_pid = ConnPid, + user = #user{username = Username}}, + limiter = Limiter, consumer_mapping = ConsumerMapping, - user = #user{username = Username}, - queue_states = QueueStates0}) -> + queue_states = QueueStates0}) -> case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> @@ -1814,8 +1858,9 @@ handle_consuming_queue_down_or_eol(QRef, %% not an HA failover. But the likelihood is not great and most users %% are unlikely to care. -cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities, - consumer_mapping = CMap}) -> +cancel_consumer(CTag, QName, + State = #ch{cfg = #conf{capabilities = Capabilities}, + consumer_mapping = CMap}) -> case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag, @@ -1885,7 +1930,8 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, basic_return(#basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}, - State = #ch{protocol = Protocol, writer_pid = WriterPid}, + State = #ch{cfg = #conf{protocol = Protocol, + writer_pid = WriterPid}}, Reason) -> ?INCR_STATS(exchange_stats, ExchangeName, 1, return_unroutable, State), {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), @@ -1920,12 +1966,14 @@ internal_reject(Requeue, Acked, Limiter, record_sent(Type, Tag, AckRequired, Msg = {QName, QPid, MsgId, Redelivered, _Message}, - State = #ch{unacked_message_q = UAMQ, - next_tag = DeliveryTag, - trace_state = TraceState, - user = #user{username = Username}, - conn_name = ConnName, - channel = ChannelNum}) -> + State = #ch{cfg = #conf{channel = ChannelNum, + trace_state = TraceState, + user = #user{username = Username}, + conn_name = ConnName + }, + unacked_message_q = UAMQ, + next_tag = DeliveryTag + }) -> ?INCR_STATS(queue_stats, QName, 1, case {Type, AckRequired} of {get, true} -> get; {get, false} -> get_no_ack; @@ -1936,10 +1984,11 @@ record_sent(Type, Tag, AckRequired, true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State); false -> ok end, + DeliveredAt = os:system_time(millisecond), rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), UAMQ1 = case AckRequired of - true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}}, - UAMQ); + true -> ?QUEUE:in({DeliveryTag, Tag, DeliveredAt, + {QPid, MsgId}}, UAMQ); false -> UAMQ end, State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. @@ -1952,7 +2001,7 @@ collect_acks(Q, DeliveryTag, Multiple) -> collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> case ?QUEUE:out(Q) of - {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, + {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Time, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> {[UnackedMsg | ToAcc], @@ -2007,9 +2056,10 @@ incr_queue_stats(QPid, QNames, MsgIds, State) -> %% ack first" order. new_tx() -> {?QUEUE:new(), []}. -notify_queues(State = #ch{state = closing}) -> +notify_queues(State = #ch{cfg = #conf{state = closing}}) -> {ok, State}; notify_queues(State = #ch{consumer_mapping = Consumers, + cfg = Cfg, delivering_queues = DQ }) -> QRefs0 = sets:to_list( sets:union(sets:from_list(consumer_queue_refs(Consumers)), DQ)), @@ -2017,15 +2067,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers, QPids = [P || P <- QRefs0, ?IS_CLASSIC(P)], Timeout = get_operation_timeout(), {rabbit_amqqueue:notify_down_all(QPids, self(), Timeout), - State#ch{state = closing}}. + State#ch{cfg = Cfg#conf{state = closing}}}. foreach_per_queue(_F, [], Acc) -> Acc; -foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}], Acc) -> +foreach_per_queue(F, [{_DTag, CTag, _Time, {QPid, MsgId}}], Acc) -> %% quorum queue, needs the consumer tag F({QPid, CTag}, [MsgId], Acc); foreach_per_queue(F, UAL, Acc) -> - T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) -> + T = lists:foldl(fun ({_DTag, CTag, _Time, {QPid, MsgId}}, T) -> rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T) end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T). @@ -2167,7 +2217,7 @@ send_confirms_and_nacks(State) -> send_nacks([], _, State) -> State; -send_nacks(_Rs, _, State = #ch{state = closing}) -> %% optimisation +send_nacks(_Rs, _, State = #ch{cfg = #conf{state = closing}}) -> %% optimisation State; send_nacks(Rs, Cs, State) -> coalesce_and_send(Rs, Cs, @@ -2178,7 +2228,7 @@ send_nacks(Rs, Cs, State) -> send_confirms([], _, State) -> State; -send_confirms(_Cs, _, State = #ch{state = closing}) -> %% optimisation +send_confirms(_Cs, _, State = #ch{cfg = #conf{state = closing}}) -> %% optimisation State; send_confirms([MsgSeqNo], _, State) -> ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State), @@ -2243,14 +2293,14 @@ infos(Items, Deadline, State) -> end || Item <- Items]. i(pid, _) -> self(); -i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; -i(number, #ch{channel = Channel}) -> Channel; -i(user, #ch{user = User}) -> User#user.username; +i(connection, #ch{cfg = #conf{conn_pid = ConnPid}}) -> ConnPid; +i(number, #ch{cfg = #conf{channel = Channel}}) -> Channel; +i(user, #ch{cfg = #conf{user = User}}) -> User#user.username; i(user_who_performed_action, Ch) -> i(user, Ch); -i(vhost, #ch{virtual_host = VHost}) -> VHost; +i(vhost, #ch{cfg = #conf{virtual_host = VHost}}) -> VHost; i(transactional, #ch{tx = Tx}) -> Tx =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; -i(source, #ch{source = ChSrc}) -> ChSrc; +i(source, #ch{cfg = #conf{source = ChSrc}}) -> ChSrc; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); @@ -2259,9 +2309,9 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; -i(state, #ch{state = running}) -> credit_flow:state(); -i(state, #ch{state = State}) -> State; -i(prefetch_count, #ch{consumer_prefetch = C}) -> C; +i(state, #ch{cfg = #conf{state = running}}) -> credit_flow:state(); +i(state, #ch{cfg = #conf{state = State}}) -> State; +i(prefetch_count, #ch{cfg = #conf{consumer_prefetch = C}}) -> C; i(global_prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); i(interceptors, #ch{interceptor_state = IState}) -> @@ -2274,7 +2324,7 @@ i(reductions, _State) -> i(Item, _) -> throw({bad_argument, Item}). -name(#ch{conn_name = ConnName, channel = Channel}) -> +name(#ch{cfg = #conf{conn_name = ConnName, channel = Channel}}) -> list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). emit_stats(State) -> emit_stats(State, []). @@ -2297,9 +2347,9 @@ erase_queue_stats(QName) -> end || {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), QName0 =:= QName]. -get_vhost(#ch{virtual_host = VHost}) -> VHost. +get_vhost(#ch{cfg = #conf{virtual_host = VHost}}) -> VHost. -get_user(#ch{user = User}) -> User. +get_user(#ch{cfg = #conf{user = User}}) -> User. delete_stats({queue_stats, QName}) -> rabbit_core_metrics:channel_queue_down({self(), QName}); @@ -2559,7 +2609,7 @@ handle_deliver(ConsumerTag, AckRequired, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}, - State = #ch{writer_pid = WriterPid, + State = #ch{cfg = #conf{writer_pid = WriterPid}, next_tag = DeliveryTag}) -> Deliver = #'basic.deliver'{consumer_tag = ConsumerTag, delivery_tag = DeliveryTag, @@ -2592,9 +2642,29 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, State1 = track_delivering_queue(NoAck, QPid, QName, State), {noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}. -init_queue_cleanup_timer(State) -> - {ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval), - State#ch{queue_cleanup_timer = erlang:send_after(Interval, self(), queue_cleanup)}. +init_tick_timer(State = #ch{tick_timer = undefined}) -> + {ok, Interval} = application:get_env(rabbit, channel_tick_interval), + State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}; +init_tick_timer(State) -> + State. + +reset_tick_timer(State) -> + State#ch{tick_timer = undefined}. + +maybe_cancel_tick_timer(#ch{tick_timer = undefined} = State) -> + State; +maybe_cancel_tick_timer(#ch{tick_timer = TRef, + unacked_message_q = UMQ} = State) -> + case ?QUEUE:len(UMQ) of + 0 -> + %% we can only cancel the tick timer if the unacked messages + %% queue is empty. + _ = erlang:cancel_timer(TRef), + State#ch{tick_timer = undefined}; + _ -> + %% let the timer continue + State + end. %% only classic queues need monitoring so rather than special casing %% everywhere monitors are set up we wrap it here for this module @@ -2612,7 +2682,7 @@ add_delivery_count_header(#{delivery_count := Count}, Msg) -> add_delivery_count_header(_, Msg) -> Msg. -qpid_to_ref(Pid) when is_pid(Pid) -> Pid; +qpid_to_ref(Pid) when is_pid(Pid) -> Pid; qpid_to_ref({Name, _}) -> Name; %% assume it already is a ref qpid_to_ref(Ref) -> Ref. @@ -2632,3 +2702,60 @@ queue_fold(Fun, Init, Q) -> {empty, _Q} -> Init; {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) end. + +evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel, + capabilities = Capabilities, + consumer_timeout = Timeout}, + queue_names = QNames, + queue_consumers = QCons, + unacked_message_q = UAMQ}) -> + Now = os:system_time(millisecond), + case ?QUEUE:peek(UAMQ) of + {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}} + when is_integer(Timeout) + andalso Time < Now - Timeout -> + rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " + "waiting on ack", + [rabbit_data_coercion:to_binary(ConsumerTag), + Channel]), + SupportsCancel = case rabbit_misc:table_lookup( + Capabilities, + <<"consumer_cancel_notify">>) of + {bool, true} when is_binary(ConsumerTag) -> + true; + _ -> false + end, + case SupportsCancel of + false -> + Ex = rabbit_misc:amqp_error(precondition_failed, + "consumer ack timed out on channel ~w", + [Channel], none), + handle_exception(Ex, State0); + true -> + QRef = qpid_to_ref(QPid), + QName = maps:get(QRef, QNames), + %% cancel the consumer with the client + State2 = cancel_consumer(ConsumerTag, QName, State0), + [Q] = rabbit_amqqueue:lookup([QName]), + %% send basic cancel to the queue + {ok, QueueStates2} = rabbit_amqqueue:basic_cancel( + Q, self(), ConsumerTag, undefined, + <<"broker">>, State2#ch.queue_states), + %% return all in-flight messages for the consumer + {MsgIds, Rem} = lists:foldl( + fun({_DelTag, ConTag, _Time, {_, MsgId}}, + {Ids, Rem}) when ConTag == ConsumerTag -> + {[MsgId | Ids], Rem}; + (Unacked, {Ids, Rem}) -> + {Ids, ?QUEUE:in(Unacked, Rem)} + end, {[], ?QUEUE:new()}, + ?QUEUE:to_list(UAMQ)), + QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, + self(), QueueStates2), + {noreply, State2#ch{queue_states = QueueStates, + queue_consumers = maps:remove(QRef, QCons), + unacked_message_q = Rem}} + end; + _ -> + {noreply, State0} + end. |
