summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_channel.erl82
2 files changed, 71 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fac9d5e50f..8855cfede4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -74,6 +74,7 @@
-export_type([name/0, qmsg/0, absent_reason/0]).
-type name() :: rabbit_types:r('queue').
+
-type qpids() :: [pid()].
-type qlen() :: rabbit_types:ok(non_neg_integer()).
-type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()).
@@ -1229,12 +1230,11 @@ purge(Q) when ?amqqueue_is_quorum(Q) ->
NodeId = amqqueue:get_pid(Q),
rabbit_quorum_queue:purge(NodeId).
--spec requeue(pid(),
+-spec requeue(pid() | amqqueue:ra_server_id(),
{rabbit_fifo:consumer_tag(), [msg_id()]},
pid(),
quorum_states()) ->
'ok'.
-
requeue(QPid, {_, MsgIds}, ChPid, QuorumStates) when ?IS_CLASSIC(QPid) ->
ok = delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}),
QuorumStates;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 59a8cce371..32579050db 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -164,7 +164,8 @@
queue_states,
queue_cleanup_timer,
%% Message content size limit
- max_message_size
+ max_message_size,
+ consumer_timeout
}).
-define(QUEUE, lqueue).
@@ -483,6 +484,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Limiter0
end,
MaxMessageSize = get_max_message_size(),
+ ConsumerTimeout = get_consumer_timeout(),
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -515,7 +517,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
delivery_flow = Flow,
interceptor_state = undefined,
queue_states = #{},
- max_message_size = MaxMessageSize},
+ max_message_size = MaxMessageSize,
+ consumer_timeout = ConsumerTimeout},
State1 = State#ch{
interceptor_state = rabbit_channel_interceptor:init(State)},
State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
@@ -811,14 +814,56 @@ handle_info({{Ref, Node}, LateAnswer}, State = #ch{channel = Channel})
[Channel, LateAnswer, Node]),
noreply(State);
-handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) ->
- QueueStates =
+handle_info(queue_cleanup, State0 = #ch{channel = Channel,
+ queue_states = QueueStates0,
+ queue_names = QNames,
+ queue_consumers = QCons,
+ consumer_timeout = Timeout,
+ unacked_message_q = UAMQ}) ->
+ QueueStates1 =
maps:filter(fun(_, QS) ->
QName = rabbit_quorum_queue:queue_name(QS),
[] /= rabbit_amqqueue:lookup(QName)
end, QueueStates0),
- noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates}));
-
+ Now = os:system_time(millisecond),
+ case ?QUEUE:peek(UAMQ) of
+ {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
+ when is_integer(Timeout) andalso Time < Now - Timeout ->
+ rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
+ "waiting on ack", [ConsumerTag, Channel]),
+ %% Cancel consumer both to the client and queue
+ %% and return all messages to the queue
+ QRef = qpid_to_ref(QPid),
+ QName = maps:get(QRef, QNames),
+ %% cancel the consumer with the client
+ State1 = cancel_consumer(ConsumerTag, QName, State0),
+ [Q] = rabbit_amqqueue:lookup([QName]),
+ %% send basic cancel to the queue
+ {ok, QueueStates2} = rabbit_amqqueue:basic_cancel(
+ Q, self(), ConsumerTag, undefined,
+ <<"broker">>, QueueStates1),
+ %% return all in-flight messages
+ {MsgIds, Rem} = lists:foldl(
+ fun({_DelTag, ConTag, _Time, {_, MsgId}},
+ {Ids, Rem})
+ when ConTag == ConsumerTag ->
+ {[MsgId | Ids], Rem};
+ (Unacked, {Ids, Rem}) ->
+ {Ids, ?QUEUE:in(Unacked, Rem)}
+ end, {[], ?QUEUE:new()},
+ ?QUEUE:to_list(UAMQ)),
+ QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
+ self(), QueueStates2),
+
+ State = State1#ch{queue_states = QueueStates,
+ queue_consumers = maps:remove(QRef, QCons),
+ unacked_message_q = Rem},
+ noreply(init_queue_cleanup_timer(State));
+ _ ->
+ noreply(
+ init_queue_cleanup_timer(
+ State0#ch{queue_states = QueueStates1}))
+ end;
handle_info({channel_source, Source}, State = #ch{}) ->
noreply(State#ch{source = Source}).
@@ -857,6 +902,13 @@ get_max_message_size() ->
?MAX_MSG_SIZE
end.
+get_consumer_timeout() ->
+ case application:get_env(rabbit, consumer_timeout) of
+ {ok, MS} when is_integer(MS) ->
+ MS;
+ _ ->
+ undefined
+ end.
%%---------------------------------------------------------------------------
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
@@ -1818,7 +1870,10 @@ cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities,
consumer_mapping = CMap}) ->
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
- {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag,
+ {bool, true} -> ok =
+
+ rabbit_log:info("Consumer cancel notify suppoerted ~w", [CTag]),
+ send(#'basic.cancel'{consumer_tag = CTag,
nowait = true}, State);
_ -> ok
end,
@@ -1936,10 +1991,11 @@ record_sent(Type, Tag, AckRequired,
true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State);
false -> ok
end,
+ DeliveredAt = os:system_time(millisecond),
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
UAMQ1 = case AckRequired of
- true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}},
- UAMQ);
+ true -> ?QUEUE:in({DeliveryTag, Tag, DeliveredAt,
+ {QPid, MsgId}}, UAMQ);
false -> UAMQ
end,
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
@@ -1952,7 +2008,7 @@ collect_acks(Q, DeliveryTag, Multiple) ->
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
case ?QUEUE:out(Q) of
- {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
+ {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Time, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
{[UnackedMsg | ToAcc],
@@ -2021,11 +2077,11 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
foreach_per_queue(_F, [], Acc) ->
Acc;
-foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}], Acc) ->
+foreach_per_queue(F, [{_DTag, CTag, _Time, {QPid, MsgId}}], Acc) ->
%% quorum queue, needs the consumer tag
F({QPid, CTag}, [MsgId], Acc);
foreach_per_queue(F, UAL, Acc) ->
- T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) ->
+ T = lists:foldl(fun ({_DTag, CTag, _Time, {QPid, MsgId}}, T) ->
rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T)
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T).
@@ -2612,7 +2668,7 @@ add_delivery_count_header(#{delivery_count := Count}, Msg) ->
add_delivery_count_header(_, Msg) ->
Msg.
-qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
+qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
qpid_to_ref({Name, _}) -> Name;
%% assume it already is a ref
qpid_to_ref(Ref) -> Ref.