summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2018-12-07 21:04:56 +0300
committerGitHub <noreply@github.com>2018-12-07 21:04:56 +0300
commitace9df7d26eb639922e5a5c4f1a91563509d5552 (patch)
treed297f277cc44bd82308ab263fc86b764ef8505b7
parent4fa8cc8678519bb2a618fff670b761858a2f3807 (diff)
parent61e33f6e15878d8742b53ee7e256548e31203c75 (diff)
downloadrabbitmq-server-git-ace9df7d26eb639922e5a5c4f1a91563509d5552.tar.gz
Merge pull request #1794 from rabbitmq/rabbitmq-server-1783
Implement node health check timeouts
-rw-r--r--src/rabbit_channel.erl96
-rw-r--r--src/rabbit_queue_consumers.erl18
2 files changed, 81 insertions, 33 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8fb73d6156..937e0a4ba3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -161,6 +161,7 @@
queue_cleanup_timer
}).
+-define(QUEUE, lqueue).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -339,12 +340,29 @@ list_local() ->
info_keys() -> ?INFO_KEYS.
info(Pid) ->
- gen_server2:call(Pid, info, infinity).
+ {Timeout, Deadline} = get_operation_timeout_and_deadline(),
+ try
+ case gen_server2:call(Pid, {info, Deadline}, Timeout) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end
+ catch
+ exit:{timeout, _} ->
+ rabbit_log:error("Timed out getting channel ~p info", [Pid]),
+ throw(timeout)
+ end.
info(Pid, Items) ->
- case gen_server2:call(Pid, {info, Items}, infinity) of
- {ok, Res} -> Res;
- {error, Error} -> throw(Error)
+ {Timeout, Deadline} = get_operation_timeout_and_deadline(),
+ try
+ case gen_server2:call(Pid, {{info, Items}, Deadline}, Timeout) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end
+ catch
+ exit:{timeout, _} ->
+ rabbit_log:error("Timed out getting channel ~p info", [Pid]),
+ throw(timeout)
end.
info_all() ->
@@ -433,7 +451,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
limiter = Limiter,
tx = none,
next_tag = 1,
- unacked_message_q = queue:new(),
+ unacked_message_q = ?QUEUE:new(),
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -493,13 +511,20 @@ prioritise_info(Msg, _Len, _State) ->
handle_call(flush, _From, State) ->
reply(ok, State);
-handle_call(info, _From, State) ->
- reply(infos(?INFO_KEYS, State), State);
+handle_call({info, Deadline}, _From, State) ->
+ try
+ reply({ok, infos(?INFO_KEYS, Deadline, State)}, State)
+ catch
+ Error ->
+ reply({error, Error}, State)
+ end;
-handle_call({info, Items}, _From, State) ->
+handle_call({{info, Items}, Deadline}, _From, State) ->
try
- reply({ok, infos(Items, State)}, State)
- catch Error -> reply({error, Error}, State)
+ reply({ok, infos(Items, Deadline, State)}, State)
+ catch
+ Error ->
+ reply({error, Error}, State)
end;
handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
@@ -1181,7 +1206,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
DQ = {Delivery#delivery{flow = Flow}, QNames},
{noreply, case Tx of
none -> deliver_to_queues(DQ, State1);
- {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
+ {Msgs, Acks} -> Msgs1 = ?QUEUE:in(DQ, Msgs),
State1#ch{tx = {Msgs1, Acks}}
end};
{error, Reason} ->
@@ -1389,10 +1414,10 @@ handle_method(#'basic.qos'{global = true,
handle_method(#'basic.qos'{global = true,
prefetch_count = PrefetchCount},
_, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
- %% TODO queue:len(UAMQ) is not strictly right since that counts
+ %% TODO ?QUEUE:len(UAMQ) is not strictly right since that counts
%% unacked messages from basic.get too. Pretty obscure though.
Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
- PrefetchCount, queue:len(UAMQ)),
+ PrefetchCount, ?QUEUE:len(UAMQ)),
case ((not rabbit_limiter:is_active(Limiter)) andalso
rabbit_limiter:is_active(Limiter1)) of
true -> rabbit_amqqueue:activate_limit_all(
@@ -1405,7 +1430,7 @@ handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ, limiter = Limiter,
queue_states = QueueStates0}) ->
OkFun = fun () -> ok end,
- UAMQL = queue:to_list(UAMQ),
+ UAMQL = ?QUEUE:to_list(UAMQ),
QueueStates =
foreach_per_queue(
fun ({QPid, CTag}, MsgIds, Acc0) ->
@@ -1419,7 +1444,7 @@ handle_method(#'basic.recover_async'{requeue = true},
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
- {noreply, State#ch{unacked_message_q = queue:new(),
+ {noreply, State#ch{unacked_message_q = ?QUEUE:new(),
queue_states = QueueStates}};
handle_method(#'basic.recover_async'{requeue = false}, _, _State) ->
@@ -1543,7 +1568,7 @@ handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
tx = {_Msgs, Acks}}) ->
AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
- UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
+ UAMQ1 = ?QUEUE:from_list(lists:usort(AcksL ++ ?QUEUE:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
tx = new_tx()}};
@@ -1835,28 +1860,28 @@ record_sent(Type, Tag, AckRequired,
end,
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
UAMQ1 = case AckRequired of
- true -> queue:in({DeliveryTag, Tag, {QPid, MsgId}},
- UAMQ);
+ true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}},
+ UAMQ);
false -> UAMQ
end,
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
%% NB: returns acks in youngest-first order
collect_acks(Q, 0, true) ->
- {lists:reverse(queue:to_list(Q)), queue:new()};
+ {lists:reverse(?QUEUE:to_list(Q)), ?QUEUE:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
collect_acks([], [], Q, DeliveryTag, Multiple).
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
- case queue:out(Q) of
+ case ?QUEUE:out(Q) of
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
{[UnackedMsg | ToAcc],
case PrefixAcc of
[] -> QTail;
- _ -> queue:join(
- queue:from_list(lists:reverse(PrefixAcc)),
+ _ -> ?QUEUE:join(
+ ?QUEUE:from_list(lists:reverse(PrefixAcc)),
QTail)
end};
Multiple ->
@@ -1902,7 +1927,7 @@ incr_queue_stats(QPid, QNames, MsgIds, State) ->
%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
%% well as the list overall, are in "most-recent (generally youngest)
%% ack first" order.
-new_tx() -> {queue:new(), []}.
+new_tx() -> {?QUEUE:new(), []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -2134,6 +2159,17 @@ complete_tx(State = #ch{tx = failed}) ->
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+infos(Items, Deadline, State) ->
+ [begin
+ Now = now_millis(),
+ if
+ Now > Deadline ->
+ throw(timeout);
+ true ->
+ {Item, i(Item, State)}
+ end
+ end || Item <- Items].
+
i(pid, _) -> self();
i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
@@ -2145,8 +2181,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
-i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> ?QUEUE:len(UAMQ);
+i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
@@ -2502,3 +2538,13 @@ qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
qpid_to_ref({Name, _}) -> Name;
%% assume it already is a ref
qpid_to_ref(Ref) -> Ref.
+
+now_millis() ->
+ erlang:monotonic_time(millisecond).
+
+get_operation_timeout_and_deadline() ->
+ % NB: can't use get_operation_timeout because
+ % this code may not be running via the channel Pid
+ Timeout = ?CHANNEL_OPERATION_TIMEOUT,
+ Deadline = now_millis() + Timeout,
+ {Timeout, Deadline}.
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index e3322f3f64..98582c8117 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -25,6 +25,8 @@
%%----------------------------------------------------------------------------
+-define(QUEUE, lqueue).
+
-define(UNSENT_MESSAGE_LIMIT, 200).
%% Utilisation average calculations are all in μs.
@@ -122,7 +124,7 @@ consumers(Consumers, Acc) ->
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
- lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
+ lists:sum([?QUEUE:len(C#cr.acktags) || C <- all_ch_record()]).
add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
Username, State = #state{consumers = Consumers,
@@ -179,7 +181,7 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
All = priority_queue:join(Consumers, BlockedQ),
ok = erase_ch_record(C),
Filtered = priority_queue:filter(chan_pred(ChPid, true), All),
- {[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)],
+ {[AckTag || {AckTag, _CTag} <- ?QUEUE:to_list(ChAckTags)],
tags(priority_queue:to_list(Filtered)),
State#state{consumers = remove_consumers(ChPid, Consumers)}}
end.
@@ -237,7 +239,7 @@ deliver_to_consumer(FetchFun,
rabbit_channel:deliver(ChPid, CTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
- true -> queue:in({AckTag, CTag}, ChAckTags);
+ true -> ?QUEUE:in({AckTag, CTag}, ChAckTags);
false -> ChAckTags
end,
update_ch_record(C#cr{acktags = ChAckTags1,
@@ -246,7 +248,7 @@ deliver_to_consumer(FetchFun,
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
- update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}),
+ update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}),
ok.
subtract_acks(ChPid, AckTags, State) ->
@@ -274,9 +276,9 @@ subtract_acks(ChPid, AckTags, State) ->
subtract_acks([], [], CTagCounts, AckQ) ->
{CTagCounts, AckQ};
subtract_acks([], Prefix, CTagCounts, AckQ) ->
- {CTagCounts, queue:join(queue:from_list(lists:reverse(Prefix)), AckQ)};
+ {CTagCounts, ?QUEUE:join(?QUEUE:from_list(lists:reverse(Prefix)), AckQ)};
subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
- case queue:out(AckQ) of
+ case ?QUEUE:out(AckQ) of
{{value, {T, CTag}}, QTail} ->
subtract_acks(TL, Prefix,
maps:update_with(CTag, fun (Old) -> Old + 1 end, 1, CTagCounts), QTail);
@@ -380,7 +382,7 @@ ch_record(ChPid, LimiterPid) ->
Limiter = rabbit_limiter:client(LimiterPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
- acktags = queue:new(),
+ acktags = ?QUEUE:new(),
consumer_count = 0,
blocked_consumers = priority_queue:new(),
limiter = Limiter,
@@ -393,7 +395,7 @@ ch_record(ChPid, LimiterPid) ->
update_ch_record(C = #cr{consumer_count = ConsumerCount,
acktags = ChAckTags,
unsent_message_count = UnsentMessageCount}) ->
- case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ case {?QUEUE:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
{true, 0, 0} -> ok = erase_ch_record(C);
_ -> ok = store_ch_record(C)
end,