summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-04-03 14:53:01 +0100
committerkjnilsson <knilsson@pivotal.io>2019-04-23 14:36:10 +0100
commit04e4d0b5ba486abb16e873b4d3df6e8792b009e6 (patch)
treee468d787ee460d549d38f57b80290d4d37f31975
parent9d898823c30ff0e56d97053764b2708a299c46eb (diff)
downloadrabbitmq-server-git-04e4d0b5ba486abb16e873b4d3df6e8792b009e6.tar.gz
Implement consumer channel timeouts
Such that if a consumer has a message awaiting ack for longer than this timeout the consumer will either be cancelled (if the client supports it) or the channel will be closed to ensure the message does not get stuck permanently.
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_channel.erl82
-rw-r--r--test/queue_parallel_SUITE.erl50
3 files changed, 113 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fac9d5e50f..8855cfede4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -74,6 +74,7 @@
-export_type([name/0, qmsg/0, absent_reason/0]).
-type name() :: rabbit_types:r('queue').
+
-type qpids() :: [pid()].
-type qlen() :: rabbit_types:ok(non_neg_integer()).
-type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()).
@@ -1229,12 +1230,11 @@ purge(Q) when ?amqqueue_is_quorum(Q) ->
NodeId = amqqueue:get_pid(Q),
rabbit_quorum_queue:purge(NodeId).
--spec requeue(pid(),
+-spec requeue(pid() | amqqueue:ra_server_id(),
{rabbit_fifo:consumer_tag(), [msg_id()]},
pid(),
quorum_states()) ->
'ok'.
-
requeue(QPid, {_, MsgIds}, ChPid, QuorumStates) when ?IS_CLASSIC(QPid) ->
ok = delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}),
QuorumStates;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 59a8cce371..32579050db 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -164,7 +164,8 @@
queue_states,
queue_cleanup_timer,
%% Message content size limit
- max_message_size
+ max_message_size,
+ consumer_timeout
}).
-define(QUEUE, lqueue).
@@ -483,6 +484,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Limiter0
end,
MaxMessageSize = get_max_message_size(),
+ ConsumerTimeout = get_consumer_timeout(),
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -515,7 +517,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
delivery_flow = Flow,
interceptor_state = undefined,
queue_states = #{},
- max_message_size = MaxMessageSize},
+ max_message_size = MaxMessageSize,
+ consumer_timeout = ConsumerTimeout},
State1 = State#ch{
interceptor_state = rabbit_channel_interceptor:init(State)},
State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
@@ -811,14 +814,56 @@ handle_info({{Ref, Node}, LateAnswer}, State = #ch{channel = Channel})
[Channel, LateAnswer, Node]),
noreply(State);
-handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) ->
- QueueStates =
+handle_info(queue_cleanup, State0 = #ch{channel = Channel,
+ queue_states = QueueStates0,
+ queue_names = QNames,
+ queue_consumers = QCons,
+ consumer_timeout = Timeout,
+ unacked_message_q = UAMQ}) ->
+ QueueStates1 =
maps:filter(fun(_, QS) ->
QName = rabbit_quorum_queue:queue_name(QS),
[] /= rabbit_amqqueue:lookup(QName)
end, QueueStates0),
- noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates}));
-
+ Now = os:system_time(millisecond),
+ case ?QUEUE:peek(UAMQ) of
+ {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
+ when is_integer(Timeout) andalso Time < Now - Timeout ->
+ rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
+ "waiting on ack", [ConsumerTag, Channel]),
+ %% Cancel consumer both to the client and queue
+ %% and return all messages to the queue
+ QRef = qpid_to_ref(QPid),
+ QName = maps:get(QRef, QNames),
+ %% cancel the consumer with the client
+ State1 = cancel_consumer(ConsumerTag, QName, State0),
+ [Q] = rabbit_amqqueue:lookup([QName]),
+ %% send basic cancel to the queue
+ {ok, QueueStates2} = rabbit_amqqueue:basic_cancel(
+ Q, self(), ConsumerTag, undefined,
+ <<"broker">>, QueueStates1),
+ %% return all in-flight messages
+ {MsgIds, Rem} = lists:foldl(
+ fun({_DelTag, ConTag, _Time, {_, MsgId}},
+ {Ids, Rem})
+ when ConTag == ConsumerTag ->
+ {[MsgId | Ids], Rem};
+ (Unacked, {Ids, Rem}) ->
+ {Ids, ?QUEUE:in(Unacked, Rem)}
+ end, {[], ?QUEUE:new()},
+ ?QUEUE:to_list(UAMQ)),
+ QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
+ self(), QueueStates2),
+
+ State = State1#ch{queue_states = QueueStates,
+ queue_consumers = maps:remove(QRef, QCons),
+ unacked_message_q = Rem},
+ noreply(init_queue_cleanup_timer(State));
+ _ ->
+ noreply(
+ init_queue_cleanup_timer(
+ State0#ch{queue_states = QueueStates1}))
+ end;
handle_info({channel_source, Source}, State = #ch{}) ->
noreply(State#ch{source = Source}).
@@ -857,6 +902,13 @@ get_max_message_size() ->
?MAX_MSG_SIZE
end.
+get_consumer_timeout() ->
+ case application:get_env(rabbit, consumer_timeout) of
+ {ok, MS} when is_integer(MS) ->
+ MS;
+ _ ->
+ undefined
+ end.
%%---------------------------------------------------------------------------
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
@@ -1818,7 +1870,10 @@ cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities,
consumer_mapping = CMap}) ->
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
- {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag,
+ {bool, true} -> ok =
+
+ rabbit_log:info("Consumer cancel notify suppoerted ~w", [CTag]),
+ send(#'basic.cancel'{consumer_tag = CTag,
nowait = true}, State);
_ -> ok
end,
@@ -1936,10 +1991,11 @@ record_sent(Type, Tag, AckRequired,
true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State);
false -> ok
end,
+ DeliveredAt = os:system_time(millisecond),
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
UAMQ1 = case AckRequired of
- true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}},
- UAMQ);
+ true -> ?QUEUE:in({DeliveryTag, Tag, DeliveredAt,
+ {QPid, MsgId}}, UAMQ);
false -> UAMQ
end,
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
@@ -1952,7 +2008,7 @@ collect_acks(Q, DeliveryTag, Multiple) ->
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
case ?QUEUE:out(Q) of
- {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
+ {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Time, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
{[UnackedMsg | ToAcc],
@@ -2021,11 +2077,11 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
foreach_per_queue(_F, [], Acc) ->
Acc;
-foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}], Acc) ->
+foreach_per_queue(F, [{_DTag, CTag, _Time, {QPid, MsgId}}], Acc) ->
%% quorum queue, needs the consumer tag
F({QPid, CTag}, [MsgId], Acc);
foreach_per_queue(F, UAL, Acc) ->
- T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) ->
+ T = lists:foldl(fun ({_DTag, CTag, _Time, {QPid, MsgId}}, T) ->
rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T)
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T).
@@ -2612,7 +2668,7 @@ add_delivery_count_header(#{delivery_count := Count}, Msg) ->
add_delivery_count_header(_, Msg) ->
Msg.
-qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
+qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
qpid_to_ref({Name, _}) -> Name;
%% assume it already is a ref
qpid_to_ref(Ref) -> Ref.
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index eba0965608..dc2678ba10 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -52,6 +52,7 @@ groups() ->
consume_and_nack,
consume_and_requeue_multiple_nack,
consume_and_multiple_nack,
+ consumer_timeout,
basic_cancel,
purge,
basic_recover,
@@ -129,19 +130,21 @@ init_per_group(mirrored_queue, Config) ->
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, true}]),
rabbit_ct_helpers:run_steps(Config1, []);
-init_per_group(Group, Config) ->
+init_per_group(Group, Config0) ->
case lists:member({group, Group}, all()) of
true ->
ClusterSize = 2,
- Config1 = rabbit_ct_helpers:set_config(Config, [
- {rmq_nodename_suffix, Group},
- {rmq_nodes_count, ClusterSize}
- ]),
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config0, {rabbit, [{consumer_timeout, 5000}]}),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps());
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
false ->
- rabbit_ct_helpers:run_steps(Config, [])
+ rabbit_ct_helpers:run_steps(Config0, [])
end.
end_per_group(Group, Config) ->
@@ -401,6 +404,29 @@ consume_and_multiple_nack(Config) ->
requeue = false}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+consumer_timeout(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ redelivered = false}, _} ->
+ %% do nothing with the delivery should trigger timeout
+ receive
+ #'basic.cancel'{ } ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ ok
+ after 20000 ->
+ flush(1),
+ exit(cancel_never_happened)
+ end
+ after 5000 ->
+ exit(deliver_timeout)
+ end.
+
subscribe_and_requeue_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),
@@ -614,3 +640,11 @@ receive_basic_deliver(Redelivered) ->
{#'basic.deliver'{redelivered = R}, _} when R == Redelivered ->
ok
end.
+
+flush(T) ->
+ receive X ->
+ ct:pal("flushed ~w", [X]),
+ flush(T)
+ after T ->
+ ok
+ end.