diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-04-08 16:12:26 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-04-23 14:36:10 +0100 |
| commit | ec76fef8a84f4dee56e422564cac5634ed89976f (patch) | |
| tree | 1b2bfffa813c3c376fe22bbb608f05a159e982a2 /src | |
| parent | d389d045df3f60bb19511d3240f3cee7c1de534c (diff) | |
| download | rabbitmq-server-git-ec76fef8a84f4dee56e422564cac5634ed89976f.tar.gz | |
Move consumer timeout tests to own SUITE
Also handle case where client does not support consumer cancellation and
rename the queue_cleanup timer to a generic "tick" timer for channels to
perform periodic activities.
[#164212469]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 62 |
1 files changed, 30 insertions, 32 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a4d3ec6321..b705dbcbdc 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -169,7 +169,7 @@ delivery_flow, interceptor_state, queue_states, - queue_cleanup_timer + tick_timer }). -define(QUEUE, lqueue). @@ -489,7 +489,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, end, MaxMessageSize = get_max_message_size(), ConsumerTimeout = get_consumer_timeout(), - rabbit_log:info("consumer timeout ~w", [ConsumerTimeout]), State = #ch{cfg = #conf{state = starting, protocol = Protocol, channel = Channel, @@ -535,7 +534,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, rabbit_event:if_enabled(State2, #ch.stats_timer, fun() -> emit_stats(State2) end), put_operation_timeout(), - State3 = init_queue_cleanup_timer(State2), + State3 = init_tick_timer(State2), {ok, State3, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -828,12 +827,13 @@ handle_info({{Ref, Node}, LateAnswer}, [Channel, LateAnswer, Node]), noreply(State); -handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel, - consumer_timeout = Timeout}, - queue_states = QueueStates0, - queue_names = QNames, - queue_consumers = QCons, - unacked_message_q = UAMQ}) -> +handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel, + capabilities = Capabilities, + consumer_timeout = Timeout}, + queue_states = QueueStates0, + queue_names = QNames, + queue_consumers = QCons, + unacked_message_q = UAMQ}) -> QueueStates1 = maps:filter(fun(_, QS) -> QName = rabbit_quorum_queue:queue_name(QS), @@ -845,22 +845,24 @@ handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel, {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}} when is_integer(Timeout) andalso Time < Now - Timeout -> - case ConsumerTag of - _ when is_integer(ConsumerTag) -> - %% basic.get - there is no mechanims so we just crash the - %% channel + rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " + "waiting on ack", + [rabbit_data_coercion:to_binary(ConsumerTag), + Channel]), + SupportsCancel = case rabbit_misc:table_lookup( + Capabilities, + <<"consumer_cancel_notify">>) of + {bool, true} when is_binary(ConsumerTag) -> + true; + _ -> false + end, + case SupportsCancel of + false -> Ex = rabbit_misc:amqp_error(precondition_failed, - "basic.get ack timed out on channel ~w", + "consumer ack timed out on channel ~w", [Channel], none), handle_exception(Ex, State0); - % rabbit_misc:protocol_error(precondition_failed, - % "basic.get ack timed out on channel ~w ", - % [Channel]); - _ -> - rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " - "waiting on ack", - [rabbit_data_coercion:to_binary(ConsumerTag), - Channel]), + true -> QRef = qpid_to_ref(QPid), QName = maps:get(QRef, QNames), %% cancel the consumer with the client @@ -881,15 +883,14 @@ handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel, ?QUEUE:to_list(UAMQ)), QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, self(), QueueStates2), - State = State1#ch{queue_states = QueueStates, queue_consumers = maps:remove(QRef, QCons), unacked_message_q = Rem}, - noreply(init_queue_cleanup_timer(State)) + noreply(init_tick_timer(State)) end; _ -> noreply( - init_queue_cleanup_timer( + init_tick_timer( State0#ch{queue_states = QueueStates1})) end; handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) -> @@ -1910,10 +1911,7 @@ cancel_consumer(CTag, QName, consumer_mapping = CMap}) -> case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of - {bool, true} -> ok = - - rabbit_log:info("Consumer cancel notify suppoerted ~w", [CTag]), - send(#'basic.cancel'{consumer_tag = CTag, + {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag, nowait = true}, State); _ -> ok end, @@ -2692,9 +2690,9 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, State1 = track_delivering_queue(NoAck, QPid, QName, State), {noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}. -init_queue_cleanup_timer(State) -> - {ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval), - State#ch{queue_cleanup_timer = erlang:send_after(Interval, self(), queue_cleanup)}. +init_tick_timer(State) -> + {ok, Interval} = application:get_env(rabbit, channel_tick_interval), + State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}. %% only classic queues need monitoring so rather than special casing %% everywhere monitors are set up we wrap it here for this module |
