summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2019-10-10 11:51:44 +0100
committerdcorbacho <dparracorbacho@piotal.io>2019-10-17 11:43:28 +0100
commitc37f81ef351c65e056e8c14a95e209a55dc75ed2 (patch)
tree0cb22d611328e8255bbeb6565837414fbf63245b
parent57ff7b30929e094de17a72b000e18f7e6cf00baa (diff)
downloadrabbitmq-server-git-c37f81ef351c65e056e8c14a95e209a55dc75ed2.tar.gz
Update core metrics with down state
As the processes responsible to update `queue_metrics` are the queues themselves, if they crash nothing else updates their metric state to down. Thus, showing as running in the UI. This change uses the existing gc process to check the state of the local queues and update it if required, while it is doing the normal gc scanning. It covers all types of queues. We consider a queue as down if the process/master/leader does no answer to requests or it is dead. There could be other situations where a queue is functionally down, but that is not covered here. [#163218289]
-rw-r--r--src/rabbit_amqqueue.erl15
-rw-r--r--src/rabbit_core_metrics_gc.erl22
2 files changed, 35 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 925e7915cb..ff83bc9d04 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,7 +32,7 @@
emit_info_local/4, emit_info_down/4]).
-export([count/0]).
-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0,
- list_with_possible_retry/1]).
+ list_local_names_down/0, list_with_possible_retry/1]).
-export([list_by_type/1]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
@@ -941,6 +941,19 @@ list_local_names() ->
[ amqqueue:get_name(Q) || Q <- list(),
amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())].
+list_local_names_down() ->
+ [ amqqueue:get_name(Q) || Q <- list(),
+ is_down(Q),
+ is_local_to_node(amqqueue:get_pid(Q), node())].
+
+is_down(Q) ->
+ try
+ info(Q, [state]) == [{state, down}]
+ catch
+ _:_ ->
+ true
+ end.
+
-spec list_by_type(atom()) -> [amqqueue:amqqueue()].
list_by_type(Type) ->
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index 99ad8eef34..326804e342 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -75,8 +75,10 @@ gc_queues() ->
gc_local_queues() ->
Queues = rabbit_amqqueue:list_local_names(),
+ QueuesDown = rabbit_amqqueue:list_local_names_down(),
GbSet = gb_sets:from_list(Queues),
- gc_entity(queue_metrics, GbSet),
+ GbSetDown = gb_sets:from_list(QueuesDown),
+ gc_queue_metrics(GbSet, GbSetDown),
gc_entity(queue_coarse_metrics, GbSet),
Followers = gb_sets:from_list(rabbit_amqqueue:list_local_followers()),
gc_leader_data(Followers).
@@ -133,6 +135,24 @@ gc_process(Pid, Table, Key) ->
none
end.
+gc_queue_metrics(GbSet, GbSetDown) ->
+ Table = queue_metrics,
+ ets:foldl(fun({Key, Props, Marker}, none) ->
+ case gb_sets:is_member(Key, GbSet) of
+ true ->
+ case gb_sets:is_member(Key, GbSetDown) of
+ true ->
+ ets:insert(Table, {Key, [{state, down} | lists:keydelete(state, 1, Props)], Marker}),
+ none;
+ false ->
+ none
+ end;
+ false ->
+ ets:delete(Table, Key),
+ none
+ end
+ end, none, Table).
+
gc_entity(Table, GbSet) ->
ets:foldl(fun({{_, Id} = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet);