summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-04-08 16:12:26 +0100
committerkjnilsson <knilsson@pivotal.io>2019-04-23 14:36:10 +0100
commitec76fef8a84f4dee56e422564cac5634ed89976f (patch)
tree1b2bfffa813c3c376fe22bbb608f05a159e982a2 /src
parentd389d045df3f60bb19511d3240f3cee7c1de534c (diff)
downloadrabbitmq-server-git-ec76fef8a84f4dee56e422564cac5634ed89976f.tar.gz
Move consumer timeout tests to own SUITE
Also handle case where client does not support consumer cancellation and rename the queue_cleanup timer to a generic "tick" timer for channels to perform periodic activities. [#164212469]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl62
1 files changed, 30 insertions, 32 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a4d3ec6321..b705dbcbdc 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -169,7 +169,7 @@
delivery_flow,
interceptor_state,
queue_states,
- queue_cleanup_timer
+ tick_timer
}).
-define(QUEUE, lqueue).
@@ -489,7 +489,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
end,
MaxMessageSize = get_max_message_size(),
ConsumerTimeout = get_consumer_timeout(),
- rabbit_log:info("consumer timeout ~w", [ConsumerTimeout]),
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
@@ -535,7 +534,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
rabbit_event:if_enabled(State2, #ch.stats_timer,
fun() -> emit_stats(State2) end),
put_operation_timeout(),
- State3 = init_queue_cleanup_timer(State2),
+ State3 = init_tick_timer(State2),
{ok, State3, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -828,12 +827,13 @@ handle_info({{Ref, Node}, LateAnswer},
[Channel, LateAnswer, Node]),
noreply(State);
-handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel,
- consumer_timeout = Timeout},
- queue_states = QueueStates0,
- queue_names = QNames,
- queue_consumers = QCons,
- unacked_message_q = UAMQ}) ->
+handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel,
+ capabilities = Capabilities,
+ consumer_timeout = Timeout},
+ queue_states = QueueStates0,
+ queue_names = QNames,
+ queue_consumers = QCons,
+ unacked_message_q = UAMQ}) ->
QueueStates1 =
maps:filter(fun(_, QS) ->
QName = rabbit_quorum_queue:queue_name(QS),
@@ -845,22 +845,24 @@ handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel,
{value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
when is_integer(Timeout)
andalso Time < Now - Timeout ->
- case ConsumerTag of
- _ when is_integer(ConsumerTag) ->
- %% basic.get - there is no mechanims so we just crash the
- %% channel
+ rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
+ "waiting on ack",
+ [rabbit_data_coercion:to_binary(ConsumerTag),
+ Channel]),
+ SupportsCancel = case rabbit_misc:table_lookup(
+ Capabilities,
+ <<"consumer_cancel_notify">>) of
+ {bool, true} when is_binary(ConsumerTag) ->
+ true;
+ _ -> false
+ end,
+ case SupportsCancel of
+ false ->
Ex = rabbit_misc:amqp_error(precondition_failed,
- "basic.get ack timed out on channel ~w",
+ "consumer ack timed out on channel ~w",
[Channel], none),
handle_exception(Ex, State0);
- % rabbit_misc:protocol_error(precondition_failed,
- % "basic.get ack timed out on channel ~w ",
- % [Channel]);
- _ ->
- rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
- "waiting on ack",
- [rabbit_data_coercion:to_binary(ConsumerTag),
- Channel]),
+ true ->
QRef = qpid_to_ref(QPid),
QName = maps:get(QRef, QNames),
%% cancel the consumer with the client
@@ -881,15 +883,14 @@ handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel,
?QUEUE:to_list(UAMQ)),
QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
self(), QueueStates2),
-
State = State1#ch{queue_states = QueueStates,
queue_consumers = maps:remove(QRef, QCons),
unacked_message_q = Rem},
- noreply(init_queue_cleanup_timer(State))
+ noreply(init_tick_timer(State))
end;
_ ->
noreply(
- init_queue_cleanup_timer(
+ init_tick_timer(
State0#ch{queue_states = QueueStates1}))
end;
handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
@@ -1910,10 +1911,7 @@ cancel_consumer(CTag, QName,
consumer_mapping = CMap}) ->
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
- {bool, true} -> ok =
-
- rabbit_log:info("Consumer cancel notify suppoerted ~w", [CTag]),
- send(#'basic.cancel'{consumer_tag = CTag,
+ {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag,
nowait = true}, State);
_ -> ok
end,
@@ -2692,9 +2690,9 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
State1 = track_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}.
-init_queue_cleanup_timer(State) ->
- {ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval),
- State#ch{queue_cleanup_timer = erlang:send_after(Interval, self(), queue_cleanup)}.
+init_tick_timer(State) ->
+ {ok, Interval} = application:get_env(rabbit, channel_tick_interval),
+ State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}.
%% only classic queues need monitoring so rather than special casing
%% everywhere monitors are set up we wrap it here for this module