summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Ansari <david.ansari@gmx.de>2021-02-22 09:31:20 +0100
committerDavid Ansari <david.ansari@gmx.de>2021-03-16 11:23:40 +0100
commita42ea8a3bbecf00f6e5d79a447963a46db2bf216 (patch)
tree57957b1d0235f11b7367ac1207867b2308865ee4
parentfa2830edcd6dd7983b36214a703c8d3ee4754009 (diff)
downloadrabbitmq-server-git-qq-metric.tar.gz
Add metric for queue availabilityqq-metric
The per-object metric returns 1 for up, and 0 for down. The aggregated metric returns the sum of queues being up. Here, `up` is equivalent to a queue being in state `running`. For every quorum queue an additional fanout happens every tick interval: The leader checks whether a majority of replicas is available in the Raft cluster. If not, the quorum queue is considered to be `down`. This change enables us to define alerts if quorum queues are down and to define SLOs on queue availability.
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl13
-rw-r--r--deps/rabbit/test/quorum_queue_SUITE.erl30
-rw-r--r--deps/rabbitmq_prometheus/erlang.mk2
-rw-r--r--deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl28
-rw-r--r--deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl2
5 files changed, 62 insertions, 13 deletions
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index aa66c97775..4d10f78684 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -900,10 +900,19 @@ policy_changed(Q) ->
cluster_state(Name) ->
case whereis(Name) of
undefined -> down;
- _ ->
+ Pid ->
case ets:lookup(ra_state, Name) of
[{_, recover}] -> recovering;
- _ -> running
+ _ ->
+ case ra:consistent_query(Pid,
+ fun (_) -> ok end,
+ ?RPC_TIMEOUT) of
+ {ok, ok, _} ->
+ running;
+ _ ->
+ rabbit_log:debug("ra:consistent_query failed for quorum queue ~w", [Name]),
+ down
+ end
end
end.
diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl
index 098c24b8c5..bef67374c7 100644
--- a/deps/rabbit/test/quorum_queue_SUITE.erl
+++ b/deps/rabbit/test/quorum_queue_SUITE.erl
@@ -66,6 +66,7 @@ groups() ->
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
consume_in_minority,
+ state_in_minority,
shrink_all,
rebalance,
file_handle_reservations,
@@ -693,6 +694,31 @@ consume_in_minority(Config) ->
ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
ok.
+state_in_minority(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server0}),
+ QNameRes = rabbit_misc:r(<<"/">>, queue, QQ),
+ AssertState = fun(ExpectedState) ->
+ timer:sleep(1000),
+ [{_, PropList, _}] = rpc:call(Leader, ets, lookup, [queue_metrics, QNameRes]),
+ ?assertEqual(ExpectedState, proplists:get_value(state, PropList))
+ end,
+
+ AssertState(running),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+ AssertState(down),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
+ AssertState(running).
+
shrink_all(Config) ->
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -2522,9 +2548,7 @@ consumer_metrics(Config) ->
timer:sleep(5000),
QNameRes = rabbit_misc:r(<<"/">>, queue, QQ),
[{_, PropList, _}] = rpc:call(Leader, ets, lookup, [queue_metrics, QNameRes]),
- ?assertMatch([{consumers, 1}], lists:filter(fun({Key, _}) ->
- Key == consumers
- end, PropList)).
+ ?assertEqual(1, proplists:get_value(consumers, PropList)).
delete_if_empty(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
diff --git a/deps/rabbitmq_prometheus/erlang.mk b/deps/rabbitmq_prometheus/erlang.mk
index 4bfafd97de..bb5f208993 100644
--- a/deps/rabbitmq_prometheus/erlang.mk
+++ b/deps/rabbitmq_prometheus/erlang.mk
@@ -6640,7 +6640,7 @@ help::
" ct Run all the common_test suites for this project" \
"" \
"All your common_test suites have their associated targets." \
- "A suite named http_SUITE can be ran using the ct-http target."
+ "A suite named http_SUITE can be run using the ct-http target."
# Plugin-specific targets.
diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
index f06ab643e3..dfa931de77 100644
--- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
+++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
@@ -36,7 +36,7 @@
%% ==How to determine if a metric should be of type GAUGE or COUNTER?==
%%
%% * GAUGE if you care about its value rather than rate of change
-%% - value can decrease as well as decrease
+%% - value can increase as well as decrease
%% * COUNTER if you care about the rate of change
%% - value can only increase
%%
@@ -188,7 +188,8 @@
{2, undefined, queue_messages_paged_out, gauge, "Messages paged out to disk", messages_paged_out},
{2, undefined, queue_messages_paged_out_bytes, gauge, "Size in bytes of messages paged out to disk", message_bytes_paged_out},
{2, undefined, queue_disk_reads_total, counter, "Total number of times queue read messages from disk", disk_reads},
- {2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes}
+ {2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes},
+ {2, {bool, running}, queues_up, gauge, "Number of queues in state running", state}
]},
{auth_attempt_metrics, [
@@ -318,6 +319,8 @@ mf(Callback, Contents, Data) ->
Fun = case Conversion of
undefined ->
fun(D) -> proplists:get_value(Key, element(Index, D)) end;
+ {bool, MatchPattern} ->
+ fun(D) -> bool(proplists:get_value(Key, element(Index, D)), MatchPattern) end;
BaseUnitConversionFactor ->
fun(D) -> proplists:get_value(Key, element(Index, D)) / BaseUnitConversionFactor end
end,
@@ -441,9 +444,9 @@ get_data(channel_metrics = Table, false) ->
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
{global_prefetch_count, A7}]}];
get_data(queue_metrics = Table, false) ->
- {Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16} =
+ {Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17} =
ets:foldl(fun({_, Props, _}, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10,
- A11, A12, A13, A14, A15, A16}) ->
+ A11, A12, A13, A14, A15, A16, A17}) ->
{T,
sum(proplists:get_value(consumers, Props), A1),
sum(proplists:get_value(consumer_utilisation, Props), A2),
@@ -460,7 +463,8 @@ get_data(queue_metrics = Table, false) ->
sum(proplists:get_value(messages_paged_out, Props), A13),
sum(proplists:get_value(message_bytes_paged_out, Props), A14),
sum(proplists:get_value(disk_reads, Props), A15),
- sum(proplists:get_value(disk_writes, Props), A16)
+ sum(proplists:get_value(disk_writes, Props), A16),
+ sum(bool(proplists:get_value(state, Props), running), A17)
}
end, empty(Table), Table),
[{Table, [{consumers, A1}, {consumer_utilisation, A2}, {memory, A3}, {messages_ram, A4},
@@ -469,7 +473,7 @@ get_data(queue_metrics = Table, false) ->
{messages_bytes_persistent, A9}, {message_bytes, A10},
{message_bytes_ready, A11}, {message_bytes_unacknowledged, A12},
{messages_paged_out, A13}, {message_bytes_paged_out, A14},
- {disk_reads, A15}, {disk_writes, A16}]}];
+ {disk_reads, A15}, {disk_writes, A16}, {state, A17}]}];
get_data(Table, false) when Table == channel_exchange_metrics;
Table == queue_coarse_metrics;
Table == channel_queue_metrics;
@@ -525,7 +529,7 @@ empty(T) when T == ra_metrics ->
empty(T) when T == channel_queue_metrics; T == channel_metrics ->
{T, 0, 0, 0, 0, 0, 0, 0};
empty(queue_metrics = T) ->
- {T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}.
+ {T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}.
sum(undefined, B) ->
B;
@@ -533,3 +537,13 @@ sum('', B) ->
B;
sum(A, B) ->
A + B.
+
+bool(Object, MatchPattern) ->
+ case Object of
+ MatchPattern ->
+ 1;
+ Aggregate when is_integer(Aggregate) ->
+ Aggregate;
+ _ ->
+ 0
+ end.
diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl
index d786f1832a..7bd3475547 100644
--- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl
+++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl
@@ -202,6 +202,7 @@ aggregated_metrics_test(Config) ->
?assertEqual(match, re:run(Body, "^# TYPE", [{capture, none}, multiline])),
?assertEqual(match, re:run(Body, "^# HELP", [{capture, none}, multiline])),
?assertEqual(nomatch, re:run(Body, ?config(queue_name, Config), [{capture, none}])),
+ ?assertEqual(match, re:run(Body, "^rabbitmq_queues_up 1$", [{capture, none}, multiline])),
%% Check the first metric value from each ETS table owned by rabbitmq_metrics
?assertEqual(match, re:run(Body, "^rabbitmq_channel_consumers ", [{capture, none}, multiline])),
?assertEqual(match, re:run(Body, "^rabbitmq_channel_messages_published_total ", [{capture, none}, multiline])),
@@ -238,6 +239,7 @@ per_object_metrics_test(Config, Path) ->
?assertEqual(match, re:run(Body, "^# TYPE", [{capture, none}, multiline])),
?assertEqual(match, re:run(Body, "^# HELP", [{capture, none}, multiline])),
?assertEqual(match, re:run(Body, ?config(queue_name, Config), [{capture, none}])),
+ ?assertEqual(match, re:run(Body, "^rabbitmq_queues_up{.*} 1$", [{capture, none}, multiline])),
%% Check the first metric value from each ETS table owned by rabbitmq_metrics
?assertEqual(match, re:run(Body, "^rabbitmq_channel_consumers{", [{capture, none}, multiline])),
?assertEqual(match, re:run(Body, "^rabbitmq_channel_messages_published_total{", [{capture, none}, multiline])),