diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2021-03-25 23:11:58 +0100 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2021-03-25 23:11:58 +0100 |
commit | 2c4cf3cdf7bc4eba4ddd87d63386d0606f838470 (patch) | |
tree | 493fd8d309374a574922f68a049f86ed23539040 | |
parent | 2eac4debbf96b6eb632e143f51fc545eb8ae4abe (diff) | |
download | rabbitmq-server-git-stream-status-command.tar.gz |
CLI command to for stream detailsstream-status-command
rabbitmq-queues stream_status [--tracking] <queue>
If the --tracking option is enabled, it returns a table with all offset tracking
for that queue.
3 files changed, 163 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 6024986e7c..ff77c22fef 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -46,6 +46,9 @@ -export([format_osiris_event/2]). -export([update_stream_conf/2]). +-export([status/2, + tracking_status/2]). + -include_lib("rabbit_common/include/rabbit.hrl"). -include("amqqueue.hrl"). @@ -491,6 +494,50 @@ i(type, _) -> stream; i(_, _) -> ''. +-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> + [[{binary(), term()}]] | {error, term()}. +status(Vhost, QueueName) -> + %% Handle not found queues + QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue}, + case rabbit_amqqueue:lookup(QName) of + {ok, Q} when ?amqqueue_is_classic(Q) -> + {error, classic_queue_not_supported}; + {ok, Q} when ?amqqueue_is_quorum(Q) -> + {error, quorum_queue_not_supported}; + {ok, Q} when ?amqqueue_is_stream(Q) -> + Data = osiris_counters:overview(), + case maps:get({osiris_writer, QName}, Data, undefined) of + undefined -> + #{}; + #{segments := Segments} = Map -> + Conf = amqqueue:get_type_state(Q), + Max = maps:get(max_segment_size, Conf, osiris_log:get_default_max_segment_size()), + Map#{max_disk_size => Segments * Max} + end; + {error, not_found} = E-> + E + end. + +-spec tracking_status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> + [[{atom(), term()}]] | {error, term()}. +tracking_status(Vhost, QueueName) -> + %% Handle not found queues + QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue}, + case rabbit_amqqueue:lookup(QName) of + {ok, Q} when ?amqqueue_is_classic(Q) -> + {error, classic_queue_not_supported}; + {ok, Q} when ?amqqueue_is_quorum(Q) -> + {error, quorum_queue_not_supported}; + {ok, Q} when ?amqqueue_is_stream(Q) -> + Leader = amqqueue:get_pid(Q), + Map = osiris:read_tracking(Leader), + maps:fold(fun(K, V, Acc) -> + [[{reference, K}, + {offset, V}] | Acc] + end, [], Map); + {error, not_found} = E-> + E + end. init(Q) when ?is_amqqueue(Q) -> Leader = amqqueue:get_pid(Q), diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/stream_status_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/stream_status_command.ex new file mode 100644 index 0000000000..981ac03176 --- /dev/null +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/stream_status_command.ex @@ -0,0 +1,71 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.StreamStatusCommand do + alias RabbitMQ.CLI.Core.DocGuide + + @behaviour RabbitMQ.CLI.CommandBehaviour + def scopes(), do: [:diagnostics, :queues] + + def merge_defaults(args, opts), do: {args, Map.merge(%{tracking: false, vhost: "/"}, opts)} + + def switches(), do: [tracking: :boolean] + + use RabbitMQ.CLI.Core.AcceptsOnePositionalArgument + use RabbitMQ.CLI.Core.RequiresRabbitAppRunning + + def run([name] = _args, %{node: node_name, vhost: vhost, tracking: :false}) do + case :rabbit_misc.rpc_call(node_name, :rabbit_stream_queue, :status, [vhost, name]) do + {:error, :classic_queue_not_supported} -> + {:error, "Cannot get stream status of a classic queue"} + + {:error, :quorum_queue_not_supported} -> + {:error, "Cannot get stream status of a quorum queue"} + + other -> + other + end + end + def run([name] = _args, %{node: node_name, vhost: vhost, tracking: :true}) do + case :rabbit_misc.rpc_call(node_name, :rabbit_stream_queue, :tracking_status, [vhost, name]) do + {:error, :classic_queue_not_supported} -> + {:error, "Cannot get stream status of a classic queue"} + + {:error, :quorum_queue_not_supported} -> + {:error, "Cannot get stream status of a quorum queue"} + + other -> + other + end + end + + use RabbitMQ.CLI.DefaultOutput + + def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable + + def usage() do + "stream_status [--vhost <vhost>] [--tracking] <queue>" + end + + def usage_additional do + [ + ["<queue>", "Name of the queue"] + ] + end + + def usage_doc_guides() do + [ + DocGuide.stream_queues() + ] + end + + def help_section(), do: :observability_and_health_checks + + def description(), do: "Displays the status of a stream queue" + + def banner([name], %{node: node_name}), + do: "Status of stream queue #{name} on node #{node_name} ..." +end diff --git a/deps/rabbitmq_cli/test/queues/stream_status_command_test.exs b/deps/rabbitmq_cli/test/queues/stream_status_command_test.exs new file mode 100644 index 0000000000..b7b98a30e6 --- /dev/null +++ b/deps/rabbitmq_cli/test/queues/stream_status_command_test.exs @@ -0,0 +1,45 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.StreamStatusCommandTest do + use ExUnit.Case, async: false + import TestHelper + + @command RabbitMQ.CLI.Queues.Commands.StreamStatusCommand + + setup_all do + RabbitMQ.CLI.Core.Distribution.start() + + :ok + end + + setup context do + {:ok, opts: %{ + node: get_rabbit_hostname(), + timeout: context[:test_timeout] || 30000 + }} + end + + + test "validate: treats no arguments as a failure" do + assert @command.validate([], %{}) == {:validation_failure, :not_enough_args} + end + + test "validate: accepts a single positional argument" do + assert @command.validate(["stream-queue-a"], %{}) == :ok + end + + test "validate: when two or more arguments are provided, returns a failure" do + assert @command.validate(["stream-queue-a", "one-extra-arg"], %{}) == {:validation_failure, :too_many_args} + assert @command.validate(["stream-queue-a", "extra-arg", "another-extra-arg"], %{}) == {:validation_failure, :too_many_args} + end + + @tag test_timeout: 3000 + test "run: targeting an unreachable node throws a badrpc" do + assert match?({:badrpc, _}, @command.run(["stream-queue-a"], + %{node: :jake@thedog, vhost: "/", timeout: 200, tracking: false})) + end +end |