diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-04-03 14:53:01 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-04-23 14:36:10 +0100 |
| commit | 04e4d0b5ba486abb16e873b4d3df6e8792b009e6 (patch) | |
| tree | e468d787ee460d549d38f57b80290d4d37f31975 | |
| parent | 9d898823c30ff0e56d97053764b2708a299c46eb (diff) | |
| download | rabbitmq-server-git-04e4d0b5ba486abb16e873b4d3df6e8792b009e6.tar.gz | |
Implement consumer channel timeouts
Such that if a consumer has a message awaiting ack for longer than this
timeout the consumer will either be cancelled (if the client supports
it) or the channel will be closed to ensure the message does not get
stuck permanently.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 82 | ||||
| -rw-r--r-- | test/queue_parallel_SUITE.erl | 50 |
3 files changed, 113 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fac9d5e50f..8855cfede4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -74,6 +74,7 @@ -export_type([name/0, qmsg/0, absent_reason/0]). -type name() :: rabbit_types:r('queue'). + -type qpids() :: [pid()]. -type qlen() :: rabbit_types:ok(non_neg_integer()). -type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()). @@ -1229,12 +1230,11 @@ purge(Q) when ?amqqueue_is_quorum(Q) -> NodeId = amqqueue:get_pid(Q), rabbit_quorum_queue:purge(NodeId). --spec requeue(pid(), +-spec requeue(pid() | amqqueue:ra_server_id(), {rabbit_fifo:consumer_tag(), [msg_id()]}, pid(), quorum_states()) -> 'ok'. - requeue(QPid, {_, MsgIds}, ChPid, QuorumStates) when ?IS_CLASSIC(QPid) -> ok = delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}), QuorumStates; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 59a8cce371..32579050db 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -164,7 +164,8 @@ queue_states, queue_cleanup_timer, %% Message content size limit - max_message_size + max_message_size, + consumer_timeout }). -define(QUEUE, lqueue). @@ -483,6 +484,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Limiter0 end, MaxMessageSize = get_max_message_size(), + ConsumerTimeout = get_consumer_timeout(), State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -515,7 +517,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, delivery_flow = Flow, interceptor_state = undefined, queue_states = #{}, - max_message_size = MaxMessageSize}, + max_message_size = MaxMessageSize, + consumer_timeout = ConsumerTimeout}, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer), @@ -811,14 +814,56 @@ handle_info({{Ref, Node}, LateAnswer}, State = #ch{channel = Channel}) [Channel, LateAnswer, Node]), noreply(State); -handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) -> - QueueStates = +handle_info(queue_cleanup, State0 = #ch{channel = Channel, + queue_states = QueueStates0, + queue_names = QNames, + queue_consumers = QCons, + consumer_timeout = Timeout, + unacked_message_q = UAMQ}) -> + QueueStates1 = maps:filter(fun(_, QS) -> QName = rabbit_quorum_queue:queue_name(QS), [] /= rabbit_amqqueue:lookup(QName) end, QueueStates0), - noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})); - + Now = os:system_time(millisecond), + case ?QUEUE:peek(UAMQ) of + {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}} + when is_integer(Timeout) andalso Time < Now - Timeout -> + rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " + "waiting on ack", [ConsumerTag, Channel]), + %% Cancel consumer both to the client and queue + %% and return all messages to the queue + QRef = qpid_to_ref(QPid), + QName = maps:get(QRef, QNames), + %% cancel the consumer with the client + State1 = cancel_consumer(ConsumerTag, QName, State0), + [Q] = rabbit_amqqueue:lookup([QName]), + %% send basic cancel to the queue + {ok, QueueStates2} = rabbit_amqqueue:basic_cancel( + Q, self(), ConsumerTag, undefined, + <<"broker">>, QueueStates1), + %% return all in-flight messages + {MsgIds, Rem} = lists:foldl( + fun({_DelTag, ConTag, _Time, {_, MsgId}}, + {Ids, Rem}) + when ConTag == ConsumerTag -> + {[MsgId | Ids], Rem}; + (Unacked, {Ids, Rem}) -> + {Ids, ?QUEUE:in(Unacked, Rem)} + end, {[], ?QUEUE:new()}, + ?QUEUE:to_list(UAMQ)), + QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, + self(), QueueStates2), + + State = State1#ch{queue_states = QueueStates, + queue_consumers = maps:remove(QRef, QCons), + unacked_message_q = Rem}, + noreply(init_queue_cleanup_timer(State)); + _ -> + noreply( + init_queue_cleanup_timer( + State0#ch{queue_states = QueueStates1})) + end; handle_info({channel_source, Source}, State = #ch{}) -> noreply(State#ch{source = Source}). @@ -857,6 +902,13 @@ get_max_message_size() -> ?MAX_MSG_SIZE end. +get_consumer_timeout() -> + case application:get_env(rabbit, consumer_timeout) of + {ok, MS} when is_integer(MS) -> + MS; + _ -> + undefined + end. %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. @@ -1818,7 +1870,10 @@ cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities, consumer_mapping = CMap}) -> case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of - {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag, + {bool, true} -> ok = + + rabbit_log:info("Consumer cancel notify suppoerted ~w", [CTag]), + send(#'basic.cancel'{consumer_tag = CTag, nowait = true}, State); _ -> ok end, @@ -1936,10 +1991,11 @@ record_sent(Type, Tag, AckRequired, true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State); false -> ok end, + DeliveredAt = os:system_time(millisecond), rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), UAMQ1 = case AckRequired of - true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}}, - UAMQ); + true -> ?QUEUE:in({DeliveryTag, Tag, DeliveredAt, + {QPid, MsgId}}, UAMQ); false -> UAMQ end, State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. @@ -1952,7 +2008,7 @@ collect_acks(Q, DeliveryTag, Multiple) -> collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> case ?QUEUE:out(Q) of - {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, + {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Time, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> {[UnackedMsg | ToAcc], @@ -2021,11 +2077,11 @@ notify_queues(State = #ch{consumer_mapping = Consumers, foreach_per_queue(_F, [], Acc) -> Acc; -foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}], Acc) -> +foreach_per_queue(F, [{_DTag, CTag, _Time, {QPid, MsgId}}], Acc) -> %% quorum queue, needs the consumer tag F({QPid, CTag}, [MsgId], Acc); foreach_per_queue(F, UAL, Acc) -> - T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) -> + T = lists:foldl(fun ({_DTag, CTag, _Time, {QPid, MsgId}}, T) -> rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T) end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T). @@ -2612,7 +2668,7 @@ add_delivery_count_header(#{delivery_count := Count}, Msg) -> add_delivery_count_header(_, Msg) -> Msg. -qpid_to_ref(Pid) when is_pid(Pid) -> Pid; +qpid_to_ref(Pid) when is_pid(Pid) -> Pid; qpid_to_ref({Name, _}) -> Name; %% assume it already is a ref qpid_to_ref(Ref) -> Ref. diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl index eba0965608..dc2678ba10 100644 --- a/test/queue_parallel_SUITE.erl +++ b/test/queue_parallel_SUITE.erl @@ -52,6 +52,7 @@ groups() -> consume_and_nack, consume_and_requeue_multiple_nack, consume_and_multiple_nack, + consumer_timeout, basic_cancel, purge, basic_recover, @@ -129,19 +130,21 @@ init_per_group(mirrored_queue, Config) -> {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, true}]), rabbit_ct_helpers:run_steps(Config1, []); -init_per_group(Group, Config) -> +init_per_group(Group, Config0) -> case lists:member({group, Group}, all()) of true -> ClusterSize = 2, - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, Group}, - {rmq_nodes_count, ClusterSize} - ]), + Config = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{consumer_timeout, 5000}]}), + Config1 = rabbit_ct_helpers:set_config( + Config, [ {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()); + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); false -> - rabbit_ct_helpers:run_steps(Config, []) + rabbit_ct_helpers:run_steps(Config0, []) end. end_per_group(Group, Config) -> @@ -401,6 +404,29 @@ consume_and_multiple_nack(Config) -> requeue = false}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). +consumer_timeout(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = _, + redelivered = false}, _} -> + %% do nothing with the delivery should trigger timeout + receive + #'basic.cancel'{ } -> + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + ok + after 20000 -> + flush(1), + exit(cancel_never_happened) + end + after 5000 -> + exit(deliver_timeout) + end. + subscribe_and_requeue_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), QName = ?config(queue_name, Config), @@ -614,3 +640,11 @@ receive_basic_deliver(Redelivered) -> {#'basic.deliver'{redelivered = R}, _} when R == Redelivered -> ok end. + +flush(T) -> + receive X -> + ct:pal("flushed ~w", [X]), + flush(T) + after T -> + ok + end. |
