diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 82 |
2 files changed, 71 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fac9d5e50f..8855cfede4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -74,6 +74,7 @@ -export_type([name/0, qmsg/0, absent_reason/0]). -type name() :: rabbit_types:r('queue'). + -type qpids() :: [pid()]. -type qlen() :: rabbit_types:ok(non_neg_integer()). -type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()). @@ -1229,12 +1230,11 @@ purge(Q) when ?amqqueue_is_quorum(Q) -> NodeId = amqqueue:get_pid(Q), rabbit_quorum_queue:purge(NodeId). --spec requeue(pid(), +-spec requeue(pid() | amqqueue:ra_server_id(), {rabbit_fifo:consumer_tag(), [msg_id()]}, pid(), quorum_states()) -> 'ok'. - requeue(QPid, {_, MsgIds}, ChPid, QuorumStates) when ?IS_CLASSIC(QPid) -> ok = delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}), QuorumStates; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 59a8cce371..32579050db 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -164,7 +164,8 @@ queue_states, queue_cleanup_timer, %% Message content size limit - max_message_size + max_message_size, + consumer_timeout }). -define(QUEUE, lqueue). @@ -483,6 +484,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Limiter0 end, MaxMessageSize = get_max_message_size(), + ConsumerTimeout = get_consumer_timeout(), State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -515,7 +517,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, delivery_flow = Flow, interceptor_state = undefined, queue_states = #{}, - max_message_size = MaxMessageSize}, + max_message_size = MaxMessageSize, + consumer_timeout = ConsumerTimeout}, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer), @@ -811,14 +814,56 @@ handle_info({{Ref, Node}, LateAnswer}, State = #ch{channel = Channel}) [Channel, LateAnswer, Node]), noreply(State); -handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) -> - QueueStates = +handle_info(queue_cleanup, State0 = #ch{channel = Channel, + queue_states = QueueStates0, + queue_names = QNames, + queue_consumers = QCons, + consumer_timeout = Timeout, + unacked_message_q = UAMQ}) -> + QueueStates1 = maps:filter(fun(_, QS) -> QName = rabbit_quorum_queue:queue_name(QS), [] /= rabbit_amqqueue:lookup(QName) end, QueueStates0), - noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})); - + Now = os:system_time(millisecond), + case ?QUEUE:peek(UAMQ) of + {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}} + when is_integer(Timeout) andalso Time < Now - Timeout -> + rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " + "waiting on ack", [ConsumerTag, Channel]), + %% Cancel consumer both to the client and queue + %% and return all messages to the queue + QRef = qpid_to_ref(QPid), + QName = maps:get(QRef, QNames), + %% cancel the consumer with the client + State1 = cancel_consumer(ConsumerTag, QName, State0), + [Q] = rabbit_amqqueue:lookup([QName]), + %% send basic cancel to the queue + {ok, QueueStates2} = rabbit_amqqueue:basic_cancel( + Q, self(), ConsumerTag, undefined, + <<"broker">>, QueueStates1), + %% return all in-flight messages + {MsgIds, Rem} = lists:foldl( + fun({_DelTag, ConTag, _Time, {_, MsgId}}, + {Ids, Rem}) + when ConTag == ConsumerTag -> + {[MsgId | Ids], Rem}; + (Unacked, {Ids, Rem}) -> + {Ids, ?QUEUE:in(Unacked, Rem)} + end, {[], ?QUEUE:new()}, + ?QUEUE:to_list(UAMQ)), + QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, + self(), QueueStates2), + + State = State1#ch{queue_states = QueueStates, + queue_consumers = maps:remove(QRef, QCons), + unacked_message_q = Rem}, + noreply(init_queue_cleanup_timer(State)); + _ -> + noreply( + init_queue_cleanup_timer( + State0#ch{queue_states = QueueStates1})) + end; handle_info({channel_source, Source}, State = #ch{}) -> noreply(State#ch{source = Source}). @@ -857,6 +902,13 @@ get_max_message_size() -> ?MAX_MSG_SIZE end. +get_consumer_timeout() -> + case application:get_env(rabbit, consumer_timeout) of + {ok, MS} when is_integer(MS) -> + MS; + _ -> + undefined + end. %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. @@ -1818,7 +1870,10 @@ cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities, consumer_mapping = CMap}) -> case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of - {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag, + {bool, true} -> ok = + + rabbit_log:info("Consumer cancel notify suppoerted ~w", [CTag]), + send(#'basic.cancel'{consumer_tag = CTag, nowait = true}, State); _ -> ok end, @@ -1936,10 +1991,11 @@ record_sent(Type, Tag, AckRequired, true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State); false -> ok end, + DeliveredAt = os:system_time(millisecond), rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), UAMQ1 = case AckRequired of - true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}}, - UAMQ); + true -> ?QUEUE:in({DeliveryTag, Tag, DeliveredAt, + {QPid, MsgId}}, UAMQ); false -> UAMQ end, State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. @@ -1952,7 +2008,7 @@ collect_acks(Q, DeliveryTag, Multiple) -> collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> case ?QUEUE:out(Q) of - {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, + {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Time, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> {[UnackedMsg | ToAcc], @@ -2021,11 +2077,11 @@ notify_queues(State = #ch{consumer_mapping = Consumers, foreach_per_queue(_F, [], Acc) -> Acc; -foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}], Acc) -> +foreach_per_queue(F, [{_DTag, CTag, _Time, {QPid, MsgId}}], Acc) -> %% quorum queue, needs the consumer tag F({QPid, CTag}, [MsgId], Acc); foreach_per_queue(F, UAL, Acc) -> - T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) -> + T = lists:foldl(fun ({_DTag, CTag, _Time, {QPid, MsgId}}, T) -> rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T) end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T). @@ -2612,7 +2668,7 @@ add_delivery_count_header(#{delivery_count := Count}, Msg) -> add_delivery_count_header(_, Msg) -> Msg. -qpid_to_ref(Pid) when is_pid(Pid) -> Pid; +qpid_to_ref(Pid) when is_pid(Pid) -> Pid; qpid_to_ref({Name, _}) -> Name; %% assume it already is a ref qpid_to_ref(Ref) -> Ref. |
