summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-05-03 13:27:29 +0300
committerGitHub <noreply@github.com>2019-05-03 13:27:29 +0300
commit36972c3dae8ec81d3227fb4dd8ee9685cf69651f (patch)
tree16f5603a15b5f5ed33468b7a5dd3cc4321ac16ba /src
parent3976d87c9a3e2e4e45a7519ab52157a82524fd0a (diff)
parent1bdd460e60d758412ef2a5dc9d3861136e05833f (diff)
downloadrabbitmq-server-git-36972c3dae8ec81d3227fb4dd8ee9685cf69651f.tar.gz
Merge pull request #1998 from rabbitmq/consumer_timeouts_take_2
Always close channel on consumer timeout
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl55
1 files changed, 9 insertions, 46 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 69add85057..19d4449e21 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2712,58 +2712,21 @@ queue_fold(Fun, Init, Q) ->
end.
evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
- capabilities = Capabilities,
consumer_timeout = Timeout},
- queue_names = QNames,
- queue_consumers = QCons,
unacked_message_q = UAMQ}) ->
Now = os:system_time(millisecond),
case ?QUEUE:peek(UAMQ) of
- {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
+ {value, {_DTag, ConsumerTag, Time, {_QPid, _Msg}}}
when is_integer(Timeout)
andalso Time < Now - Timeout ->
- rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
- "waiting on ack",
- [rabbit_data_coercion:to_binary(ConsumerTag),
- Channel]),
- SupportsCancel = case rabbit_misc:table_lookup(
- Capabilities,
- <<"consumer_cancel_notify">>) of
- {bool, true} when is_binary(ConsumerTag) ->
- true;
- _ -> false
- end,
- case SupportsCancel of
- false ->
- Ex = rabbit_misc:amqp_error(precondition_failed,
- "consumer ack timed out on channel ~w",
- [Channel], none),
- handle_exception(Ex, State0);
- true ->
- QRef = qpid_to_ref(QPid),
- QName = maps:get(QRef, QNames),
- %% cancel the consumer with the client
- State2 = cancel_consumer(ConsumerTag, QName, State0),
- [Q] = rabbit_amqqueue:lookup([QName]),
- %% send basic cancel to the queue
- {ok, QueueStates2} = rabbit_amqqueue:basic_cancel(
- Q, self(), ConsumerTag, undefined,
- <<"broker">>, State2#ch.queue_states),
- %% return all in-flight messages for the consumer
- {MsgIds, Rem} = lists:foldl(
- fun({_DelTag, ConTag, _Time, {_, MsgId}},
- {Ids, Rem}) when ConTag == ConsumerTag ->
- {[MsgId | Ids], Rem};
- (Unacked, {Ids, Rem}) ->
- {Ids, ?QUEUE:in(Unacked, Rem)}
- end, {[], ?QUEUE:new()},
- ?QUEUE:to_list(UAMQ)),
- QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
- self(), QueueStates2),
- {noreply, State2#ch{queue_states = QueueStates,
- queue_consumers = maps:remove(QRef, QCons),
- unacked_message_q = Rem}}
- end;
+ rabbit_log_channel:warning("Consumer ~s on channel ~w has timed out "
+ "waiting on consumer acknowledgement. Timeout used: ~p ms",
+ [rabbit_data_coercion:to_binary(ConsumerTag),
+ Channel, Timeout]),
+ Ex = rabbit_misc:amqp_error(precondition_failed,
+ "consumer ack timed out on channel ~w",
+ [Channel], none),
+ handle_exception(Ex, State0);
_ ->
{noreply, State0}
end.