diff options
| author | Michael Klishin <michael@novemberain.com> | 2018-12-07 21:04:56 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-12-07 21:04:56 +0300 |
| commit | ace9df7d26eb639922e5a5c4f1a91563509d5552 (patch) | |
| tree | d297f277cc44bd82308ab263fc86b764ef8505b7 | |
| parent | 4fa8cc8678519bb2a618fff670b761858a2f3807 (diff) | |
| parent | 61e33f6e15878d8742b53ee7e256548e31203c75 (diff) | |
| download | rabbitmq-server-git-ace9df7d26eb639922e5a5c4f1a91563509d5552.tar.gz | |
Merge pull request #1794 from rabbitmq/rabbitmq-server-1783
Implement node health check timeouts
| -rw-r--r-- | src/rabbit_channel.erl | 96 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 18 |
2 files changed, 81 insertions, 33 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8fb73d6156..937e0a4ba3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -161,6 +161,7 @@ queue_cleanup_timer }). +-define(QUEUE, lqueue). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -339,12 +340,29 @@ list_local() -> info_keys() -> ?INFO_KEYS. info(Pid) -> - gen_server2:call(Pid, info, infinity). + {Timeout, Deadline} = get_operation_timeout_and_deadline(), + try + case gen_server2:call(Pid, {info, Deadline}, Timeout) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end + catch + exit:{timeout, _} -> + rabbit_log:error("Timed out getting channel ~p info", [Pid]), + throw(timeout) + end. info(Pid, Items) -> - case gen_server2:call(Pid, {info, Items}, infinity) of - {ok, Res} -> Res; - {error, Error} -> throw(Error) + {Timeout, Deadline} = get_operation_timeout_and_deadline(), + try + case gen_server2:call(Pid, {{info, Items}, Deadline}, Timeout) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end + catch + exit:{timeout, _} -> + rabbit_log:error("Timed out getting channel ~p info", [Pid]), + throw(timeout) end. info_all() -> @@ -433,7 +451,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, limiter = Limiter, tx = none, next_tag = 1, - unacked_message_q = queue:new(), + unacked_message_q = ?QUEUE:new(), user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -493,13 +511,20 @@ prioritise_info(Msg, _Len, _State) -> handle_call(flush, _From, State) -> reply(ok, State); -handle_call(info, _From, State) -> - reply(infos(?INFO_KEYS, State), State); +handle_call({info, Deadline}, _From, State) -> + try + reply({ok, infos(?INFO_KEYS, Deadline, State)}, State) + catch + Error -> + reply({error, Error}, State) + end; -handle_call({info, Items}, _From, State) -> +handle_call({{info, Items}, Deadline}, _From, State) -> try - reply({ok, infos(Items, State)}, State) - catch Error -> reply({error, Error}, State) + reply({ok, infos(Items, Deadline, State)}, State) + catch + Error -> + reply({error, Error}, State) end; handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> @@ -1181,7 +1206,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, DQ = {Delivery#delivery{flow = Flow}, QNames}, {noreply, case Tx of none -> deliver_to_queues(DQ, State1); - {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs), + {Msgs, Acks} -> Msgs1 = ?QUEUE:in(DQ, Msgs), State1#ch{tx = {Msgs1, Acks}} end}; {error, Reason} -> @@ -1389,10 +1414,10 @@ handle_method(#'basic.qos'{global = true, handle_method(#'basic.qos'{global = true, prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> - %% TODO queue:len(UAMQ) is not strictly right since that counts + %% TODO ?QUEUE:len(UAMQ) is not strictly right since that counts %% unacked messages from basic.get too. Pretty obscure though. Limiter1 = rabbit_limiter:limit_prefetch(Limiter, - PrefetchCount, queue:len(UAMQ)), + PrefetchCount, ?QUEUE:len(UAMQ)), case ((not rabbit_limiter:is_active(Limiter)) andalso rabbit_limiter:is_active(Limiter1)) of true -> rabbit_amqqueue:activate_limit_all( @@ -1405,7 +1430,7 @@ handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter, queue_states = QueueStates0}) -> OkFun = fun () -> ok end, - UAMQL = queue:to_list(UAMQ), + UAMQL = ?QUEUE:to_list(UAMQ), QueueStates = foreach_per_queue( fun ({QPid, CTag}, MsgIds, Acc0) -> @@ -1419,7 +1444,7 @@ handle_method(#'basic.recover_async'{requeue = true}, ok = notify_limiter(Limiter, UAMQL), %% No answer required - basic.recover is the newer, synchronous %% variant of this method - {noreply, State#ch{unacked_message_q = queue:new(), + {noreply, State#ch{unacked_message_q = ?QUEUE:new(), queue_states = QueueStates}}; handle_method(#'basic.recover_async'{requeue = false}, _, _State) -> @@ -1543,7 +1568,7 @@ handle_method(#'tx.rollback'{}, _, #ch{tx = none}) -> handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, tx = {_Msgs, Acks}}) -> AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])), - UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))), + UAMQ1 = ?QUEUE:from_list(lists:usort(AcksL ++ ?QUEUE:to_list(UAMQ))), {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1, tx = new_tx()}}; @@ -1835,28 +1860,28 @@ record_sent(Type, Tag, AckRequired, end, rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), UAMQ1 = case AckRequired of - true -> queue:in({DeliveryTag, Tag, {QPid, MsgId}}, - UAMQ); + true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}}, + UAMQ); false -> UAMQ end, State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. %% NB: returns acks in youngest-first order collect_acks(Q, 0, true) -> - {lists:reverse(queue:to_list(Q)), queue:new()}; + {lists:reverse(?QUEUE:to_list(Q)), ?QUEUE:new()}; collect_acks(Q, DeliveryTag, Multiple) -> collect_acks([], [], Q, DeliveryTag, Multiple). collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> - case queue:out(Q) of + case ?QUEUE:out(Q) of {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> {[UnackedMsg | ToAcc], case PrefixAcc of [] -> QTail; - _ -> queue:join( - queue:from_list(lists:reverse(PrefixAcc)), + _ -> ?QUEUE:join( + ?QUEUE:from_list(lists:reverse(PrefixAcc)), QTail) end}; Multiple -> @@ -1902,7 +1927,7 @@ incr_queue_stats(QPid, QNames, MsgIds, State) -> %% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as %% well as the list overall, are in "most-recent (generally youngest) %% ack first" order. -new_tx() -> {queue:new(), []}. +new_tx() -> {?QUEUE:new(), []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -2134,6 +2159,17 @@ complete_tx(State = #ch{tx = failed}) -> infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. +infos(Items, Deadline, State) -> + [begin + Now = now_millis(), + if + Now > Deadline -> + throw(timeout); + true -> + {Item, i(Item, State)} + end + end || Item <- Items]. + i(pid, _) -> self(); i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; i(number, #ch{channel = Channel}) -> Channel; @@ -2145,8 +2181,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); -i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); -i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs); +i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> ?QUEUE:len(UAMQ); +i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; @@ -2502,3 +2538,13 @@ qpid_to_ref(Pid) when is_pid(Pid) -> Pid; qpid_to_ref({Name, _}) -> Name; %% assume it already is a ref qpid_to_ref(Ref) -> Ref. + +now_millis() -> + erlang:monotonic_time(millisecond). + +get_operation_timeout_and_deadline() -> + % NB: can't use get_operation_timeout because + % this code may not be running via the channel Pid + Timeout = ?CHANNEL_OPERATION_TIMEOUT, + Deadline = now_millis() + Timeout, + {Timeout, Deadline}. diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index e3322f3f64..98582c8117 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -25,6 +25,8 @@ %%---------------------------------------------------------------------------- +-define(QUEUE, lqueue). + -define(UNSENT_MESSAGE_LIMIT, 200). %% Utilisation average calculations are all in μs. @@ -122,7 +124,7 @@ consumers(Consumers, Acc) -> count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> - lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). + lists:sum([?QUEUE:len(C#cr.acktags) || C <- all_ch_record()]). add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty, Username, State = #state{consumers = Consumers, @@ -179,7 +181,7 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> All = priority_queue:join(Consumers, BlockedQ), ok = erase_ch_record(C), Filtered = priority_queue:filter(chan_pred(ChPid, true), All), - {[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)], + {[AckTag || {AckTag, _CTag} <- ?QUEUE:to_list(ChAckTags)], tags(priority_queue:to_list(Filtered)), State#state{consumers = remove_consumers(ChPid, Consumers)}} end. @@ -237,7 +239,7 @@ deliver_to_consumer(FetchFun, rabbit_channel:deliver(ChPid, CTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of - true -> queue:in({AckTag, CTag}, ChAckTags); + true -> ?QUEUE:in({AckTag, CTag}, ChAckTags); false -> ChAckTags end, update_ch_record(C#cr{acktags = ChAckTags1, @@ -246,7 +248,7 @@ deliver_to_consumer(FetchFun, record_ack(ChPid, LimiterPid, AckTag) -> C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), - update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}), + update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}), ok. subtract_acks(ChPid, AckTags, State) -> @@ -274,9 +276,9 @@ subtract_acks(ChPid, AckTags, State) -> subtract_acks([], [], CTagCounts, AckQ) -> {CTagCounts, AckQ}; subtract_acks([], Prefix, CTagCounts, AckQ) -> - {CTagCounts, queue:join(queue:from_list(lists:reverse(Prefix)), AckQ)}; + {CTagCounts, ?QUEUE:join(?QUEUE:from_list(lists:reverse(Prefix)), AckQ)}; subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) -> - case queue:out(AckQ) of + case ?QUEUE:out(AckQ) of {{value, {T, CTag}}, QTail} -> subtract_acks(TL, Prefix, maps:update_with(CTag, fun (Old) -> Old + 1 end, 1, CTagCounts), QTail); @@ -380,7 +382,7 @@ ch_record(ChPid, LimiterPid) -> Limiter = rabbit_limiter:client(LimiterPid), C = #cr{ch_pid = ChPid, monitor_ref = MonitorRef, - acktags = queue:new(), + acktags = ?QUEUE:new(), consumer_count = 0, blocked_consumers = priority_queue:new(), limiter = Limiter, @@ -393,7 +395,7 @@ ch_record(ChPid, LimiterPid) -> update_ch_record(C = #cr{consumer_count = ConsumerCount, acktags = ChAckTags, unsent_message_count = UnsentMessageCount}) -> - case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of + case {?QUEUE:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of {true, 0, 0} -> ok = erase_ch_record(C); _ -> ok = store_ch_record(C) end, |
