summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-05-01 14:00:51 +0100
committerkjnilsson <knilsson@pivotal.io>2019-05-01 14:11:05 +0100
commit9933cc44876cd5b277a0ecf3c7a2e31f636384c7 (patch)
treec1b78c4298d4c9bdb1699b0f9d9126717550143e /src
parent45402b9454a12f6b1a9be3ff1827ed4ee74da7d1 (diff)
downloadrabbitmq-server-git-9933cc44876cd5b277a0ecf3c7a2e31f636384c7.tar.gz
Always close channel on consumer timeout
In order to keep things simple and consistent this changes consumer timeouts to always close the channel instead of cancelling and returning.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl47
1 files changed, 5 insertions, 42 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 69add85057..048254f5e9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2712,58 +2712,21 @@ queue_fold(Fun, Init, Q) ->
end.
evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
- capabilities = Capabilities,
consumer_timeout = Timeout},
- queue_names = QNames,
- queue_consumers = QCons,
unacked_message_q = UAMQ}) ->
Now = os:system_time(millisecond),
case ?QUEUE:peek(UAMQ) of
- {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
+ {value, {_DTag, ConsumerTag, Time, {_QPid, _Msg}}}
when is_integer(Timeout)
andalso Time < Now - Timeout ->
rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
"waiting on ack",
[rabbit_data_coercion:to_binary(ConsumerTag),
Channel]),
- SupportsCancel = case rabbit_misc:table_lookup(
- Capabilities,
- <<"consumer_cancel_notify">>) of
- {bool, true} when is_binary(ConsumerTag) ->
- true;
- _ -> false
- end,
- case SupportsCancel of
- false ->
- Ex = rabbit_misc:amqp_error(precondition_failed,
- "consumer ack timed out on channel ~w",
- [Channel], none),
- handle_exception(Ex, State0);
- true ->
- QRef = qpid_to_ref(QPid),
- QName = maps:get(QRef, QNames),
- %% cancel the consumer with the client
- State2 = cancel_consumer(ConsumerTag, QName, State0),
- [Q] = rabbit_amqqueue:lookup([QName]),
- %% send basic cancel to the queue
- {ok, QueueStates2} = rabbit_amqqueue:basic_cancel(
- Q, self(), ConsumerTag, undefined,
- <<"broker">>, State2#ch.queue_states),
- %% return all in-flight messages for the consumer
- {MsgIds, Rem} = lists:foldl(
- fun({_DelTag, ConTag, _Time, {_, MsgId}},
- {Ids, Rem}) when ConTag == ConsumerTag ->
- {[MsgId | Ids], Rem};
- (Unacked, {Ids, Rem}) ->
- {Ids, ?QUEUE:in(Unacked, Rem)}
- end, {[], ?QUEUE:new()},
- ?QUEUE:to_list(UAMQ)),
- QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
- self(), QueueStates2),
- {noreply, State2#ch{queue_states = QueueStates,
- queue_consumers = maps:remove(QRef, QCons),
- unacked_message_q = Rem}}
- end;
+ Ex = rabbit_misc:amqp_error(precondition_failed,
+ "consumer ack timed out on channel ~w",
+ [Channel], none),
+ handle_exception(Ex, State0);
_ ->
{noreply, State0}
end.