diff options
author | David Ansari <david.ansari@gmx.de> | 2021-02-22 09:31:20 +0100 |
---|---|---|
committer | David Ansari <david.ansari@gmx.de> | 2021-03-16 11:23:40 +0100 |
commit | a42ea8a3bbecf00f6e5d79a447963a46db2bf216 (patch) | |
tree | 57957b1d0235f11b7367ac1207867b2308865ee4 | |
parent | fa2830edcd6dd7983b36214a703c8d3ee4754009 (diff) | |
download | rabbitmq-server-git-qq-metric.tar.gz |
Add metric for queue availabilityqq-metric
The per-object metric returns 1 for up, and 0 for down.
The aggregated metric returns the sum of queues being up.
Here, `up` is equivalent to a queue being in state `running`.
For every quorum queue an additional fanout happens every tick interval:
The leader checks whether a majority of replicas is available in the
Raft cluster. If not, the quorum queue is considered to be `down`.
This change enables us to define alerts if quorum queues are down and to
define SLOs on queue availability.
5 files changed, 62 insertions, 13 deletions
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index aa66c97775..4d10f78684 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -900,10 +900,19 @@ policy_changed(Q) -> cluster_state(Name) -> case whereis(Name) of undefined -> down; - _ -> + Pid -> case ets:lookup(ra_state, Name) of [{_, recover}] -> recovering; - _ -> running + _ -> + case ra:consistent_query(Pid, + fun (_) -> ok end, + ?RPC_TIMEOUT) of + {ok, ok, _} -> + running; + _ -> + rabbit_log:debug("ra:consistent_query failed for quorum queue ~w", [Name]), + down + end end end. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 098c24b8c5..bef67374c7 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -66,6 +66,7 @@ groups() -> metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, consume_in_minority, + state_in_minority, shrink_all, rebalance, file_handle_reservations, @@ -693,6 +694,31 @@ consume_in_minority(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, Server2), ok. +state_in_minority(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server0}), + QNameRes = rabbit_misc:r(<<"/">>, queue, QQ), + AssertState = fun(ExpectedState) -> + timer:sleep(1000), + [{_, PropList, _}] = rpc:call(Leader, ets, lookup, [queue_metrics, QNameRes]), + ?assertEqual(ExpectedState, proplists:get_value(state, PropList)) + end, + + AssertState(running), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + AssertState(down), + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + ok = rabbit_ct_broker_helpers:start_node(Config, Server2), + AssertState(running). + shrink_all(Config) -> [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -2522,9 +2548,7 @@ consumer_metrics(Config) -> timer:sleep(5000), QNameRes = rabbit_misc:r(<<"/">>, queue, QQ), [{_, PropList, _}] = rpc:call(Leader, ets, lookup, [queue_metrics, QNameRes]), - ?assertMatch([{consumers, 1}], lists:filter(fun({Key, _}) -> - Key == consumers - end, PropList)). + ?assertEqual(1, proplists:get_value(consumers, PropList)). delete_if_empty(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), diff --git a/deps/rabbitmq_prometheus/erlang.mk b/deps/rabbitmq_prometheus/erlang.mk index 4bfafd97de..bb5f208993 100644 --- a/deps/rabbitmq_prometheus/erlang.mk +++ b/deps/rabbitmq_prometheus/erlang.mk @@ -6640,7 +6640,7 @@ help:: " ct Run all the common_test suites for this project" \ "" \ "All your common_test suites have their associated targets." \ - "A suite named http_SUITE can be ran using the ct-http target." + "A suite named http_SUITE can be run using the ct-http target." # Plugin-specific targets. diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index f06ab643e3..dfa931de77 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -36,7 +36,7 @@ %% ==How to determine if a metric should be of type GAUGE or COUNTER?== %% %% * GAUGE if you care about its value rather than rate of change -%% - value can decrease as well as decrease +%% - value can increase as well as decrease %% * COUNTER if you care about the rate of change %% - value can only increase %% @@ -188,7 +188,8 @@ {2, undefined, queue_messages_paged_out, gauge, "Messages paged out to disk", messages_paged_out}, {2, undefined, queue_messages_paged_out_bytes, gauge, "Size in bytes of messages paged out to disk", message_bytes_paged_out}, {2, undefined, queue_disk_reads_total, counter, "Total number of times queue read messages from disk", disk_reads}, - {2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes} + {2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes}, + {2, {bool, running}, queues_up, gauge, "Number of queues in state running", state} ]}, {auth_attempt_metrics, [ @@ -318,6 +319,8 @@ mf(Callback, Contents, Data) -> Fun = case Conversion of undefined -> fun(D) -> proplists:get_value(Key, element(Index, D)) end; + {bool, MatchPattern} -> + fun(D) -> bool(proplists:get_value(Key, element(Index, D)), MatchPattern) end; BaseUnitConversionFactor -> fun(D) -> proplists:get_value(Key, element(Index, D)) / BaseUnitConversionFactor end end, @@ -441,9 +444,9 @@ get_data(channel_metrics = Table, false) -> {messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6}, {global_prefetch_count, A7}]}]; get_data(queue_metrics = Table, false) -> - {Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16} = + {Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17} = ets:foldl(fun({_, Props, _}, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, - A11, A12, A13, A14, A15, A16}) -> + A11, A12, A13, A14, A15, A16, A17}) -> {T, sum(proplists:get_value(consumers, Props), A1), sum(proplists:get_value(consumer_utilisation, Props), A2), @@ -460,7 +463,8 @@ get_data(queue_metrics = Table, false) -> sum(proplists:get_value(messages_paged_out, Props), A13), sum(proplists:get_value(message_bytes_paged_out, Props), A14), sum(proplists:get_value(disk_reads, Props), A15), - sum(proplists:get_value(disk_writes, Props), A16) + sum(proplists:get_value(disk_writes, Props), A16), + sum(bool(proplists:get_value(state, Props), running), A17) } end, empty(Table), Table), [{Table, [{consumers, A1}, {consumer_utilisation, A2}, {memory, A3}, {messages_ram, A4}, @@ -469,7 +473,7 @@ get_data(queue_metrics = Table, false) -> {messages_bytes_persistent, A9}, {message_bytes, A10}, {message_bytes_ready, A11}, {message_bytes_unacknowledged, A12}, {messages_paged_out, A13}, {message_bytes_paged_out, A14}, - {disk_reads, A15}, {disk_writes, A16}]}]; + {disk_reads, A15}, {disk_writes, A16}, {state, A17}]}]; get_data(Table, false) when Table == channel_exchange_metrics; Table == queue_coarse_metrics; Table == channel_queue_metrics; @@ -525,7 +529,7 @@ empty(T) when T == ra_metrics -> empty(T) when T == channel_queue_metrics; T == channel_metrics -> {T, 0, 0, 0, 0, 0, 0, 0}; empty(queue_metrics = T) -> - {T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}. + {T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}. sum(undefined, B) -> B; @@ -533,3 +537,13 @@ sum('', B) -> B; sum(A, B) -> A + B. + +bool(Object, MatchPattern) -> + case Object of + MatchPattern -> + 1; + Aggregate when is_integer(Aggregate) -> + Aggregate; + _ -> + 0 + end. diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index d786f1832a..7bd3475547 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -202,6 +202,7 @@ aggregated_metrics_test(Config) -> ?assertEqual(match, re:run(Body, "^# TYPE", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^# HELP", [{capture, none}, multiline])), ?assertEqual(nomatch, re:run(Body, ?config(queue_name, Config), [{capture, none}])), + ?assertEqual(match, re:run(Body, "^rabbitmq_queues_up 1$", [{capture, none}, multiline])), %% Check the first metric value from each ETS table owned by rabbitmq_metrics ?assertEqual(match, re:run(Body, "^rabbitmq_channel_consumers ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_channel_messages_published_total ", [{capture, none}, multiline])), @@ -238,6 +239,7 @@ per_object_metrics_test(Config, Path) -> ?assertEqual(match, re:run(Body, "^# TYPE", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^# HELP", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, ?config(queue_name, Config), [{capture, none}])), + ?assertEqual(match, re:run(Body, "^rabbitmq_queues_up{.*} 1$", [{capture, none}, multiline])), %% Check the first metric value from each ETS table owned by rabbitmq_metrics ?assertEqual(match, re:run(Body, "^rabbitmq_channel_consumers{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_channel_messages_published_total{", [{capture, none}, multiline])), |