summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_cli/test/test_helper.exs
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_cli/test/test_helper.exs')
-rw-r--r--deps/rabbitmq_cli/test/test_helper.exs620
1 files changed, 620 insertions, 0 deletions
diff --git a/deps/rabbitmq_cli/test/test_helper.exs b/deps/rabbitmq_cli/test/test_helper.exs
new file mode 100644
index 0000000000..fca68e57bd
--- /dev/null
+++ b/deps/rabbitmq_cli/test/test_helper.exs
@@ -0,0 +1,620 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+
+four_hours = 240 * 60 * 1000
+ExUnit.configure(
+ exclude: [disabled: true],
+ module_load_timeout: four_hours,
+ timeout: four_hours)
+
+ExUnit.start()
+
+defmodule TestHelper do
+ import ExUnit.Assertions
+ alias RabbitMQ.CLI.Plugins.Helpers, as: PluginHelpers
+
+ alias RabbitMQ.CLI.Core.{CommandModules, Config, Helpers, NodeName}
+ import RabbitMQ.CLI.Core.Platform
+
+ def get_rabbit_hostname(node_name_type \\ :shortnames) do
+ Helpers.get_rabbit_hostname(node_name_type)
+ end
+
+ def hostname, do: NodeName.hostname()
+
+ def domain, do: NodeName.domain()
+
+ def fixture_file_path(filename) do
+ Path.join([File.cwd!(), "test", "fixtures", "files", filename])
+ end
+
+ def fixture_plugins_path(plugins_directory) do
+ Path.join([File.cwd!(), "test", "fixtures", "plugins", plugins_directory])
+ end
+
+ def get_cluster_name() do
+ :rpc.call(get_rabbit_hostname(), :rabbit_nodes, :cluster_name, [])
+ end
+
+ def add_vhost(name) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_vhost, :add, [name, "acting-user"])
+ end
+
+ def delete_vhost(name) do
+ # some quick tests create and delete a vhost immediately, resulting
+ # in a high enough restart intensity in rabbit_vhost_sup_wrapper to
+ # make the rabbit app terminate. See https://github.com/rabbitmq/rabbitmq-server/issues/1280.
+ :timer.sleep(250)
+ :rpc.call(get_rabbit_hostname(), :rabbit_vhost, :delete, [name, "acting-user"])
+ end
+
+ def list_vhosts() do
+ :rpc.call(get_rabbit_hostname(), :rabbit_vhost, :info_all, [])
+ end
+
+ def enable_feature_flag(feature_flag) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_feature_flags, :enable, [feature_flag])
+ end
+
+ def list_feature_flags(arg) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_feature_flags, :list, [arg])
+ end
+
+ def add_user(name, password) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_auth_backend_internal, :add_user,
+ [name, password, "acting-user"])
+ end
+
+ def delete_user(name) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_auth_backend_internal, :delete_user,
+ [name, "acting-user"])
+ end
+
+ def list_users() do
+ :rpc.call(get_rabbit_hostname(), :rabbit_auth_backend_internal, :list_users, [])
+ end
+
+ def trace_on(vhost) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_trace, :start, [vhost])
+ end
+
+ def trace_off(vhost) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_trace, :stop, [vhost])
+ end
+
+ def set_user_tags(name, tags) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_auth_backend_internal, :set_tags,
+ [name, tags, "acting-user"])
+ end
+
+ def authenticate_user(name, password) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_access_control,:check_user_pass_login, [name, password])
+ end
+
+ def set_parameter(vhost, component_name, key, value) do
+ :ok = :rpc.call(get_rabbit_hostname(), :rabbit_runtime_parameters, :parse_set, [vhost, component_name, key, value, :nouser])
+ end
+
+ def clear_parameter(vhost, component_name, key) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_runtime_parameters, :clear, [vhost, component_name, key, <<"acting-user">>])
+ end
+
+ def list_parameters(vhost) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_runtime_parameters, :list_formatted, [vhost])
+ end
+
+ def set_global_parameter(key, value) do
+ :ok = :rpc.call(get_rabbit_hostname(), :rabbit_runtime_parameters, :parse_set_global,
+ [key, value, "acting-user"])
+ end
+
+ def clear_global_parameter(key) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_runtime_parameters, :clear_global,
+ [key, "acting-user"])
+ end
+
+ def list_global_parameters() do
+ :rpc.call(get_rabbit_hostname(), :rabbit_runtime_parameters, :list_global_formatted, [])
+ end
+
+ def set_permissions(user, vhost, [conf, write, read]) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_auth_backend_internal, :set_permissions, [user, vhost, conf, write, read, "acting-user"])
+ end
+
+ def list_policies(vhost) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_policy, :list_formatted, [vhost])
+ end
+
+ def set_policy(vhost, name, pattern, value) do
+ {:ok, decoded} = :rabbit_json.try_decode(value)
+ parsed = :maps.to_list(decoded)
+ :ok = :rpc.call(get_rabbit_hostname(), :rabbit_policy, :set, [vhost, name, pattern, parsed, 0, "all", "acting-user"])
+ end
+
+ def clear_policy(vhost, key) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_policy, :delete, [vhost, key, "acting-user"])
+ end
+
+ def list_operator_policies(vhost) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_policy, :list_formatted_op, [vhost])
+ end
+
+ def set_operator_policy(vhost, name, pattern, value) do
+ {:ok, decoded} = :rabbit_json.try_decode(value)
+ parsed = :maps.to_list(decoded)
+ :ok = :rpc.call(get_rabbit_hostname(), :rabbit_policy, :set_op, [vhost, name, pattern, parsed, 0, "all", "acting-user"])
+ end
+
+ def clear_operator_policy(vhost, key) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_policy, :delete_op, [vhost, key, "acting-user"])
+ end
+
+ def declare_queue(name, vhost, durable \\ false, auto_delete \\ false, args \\ [], owner \\ :none) do
+ queue_name = :rabbit_misc.r(vhost, :queue, name)
+ :rpc.call(get_rabbit_hostname(),
+ :rabbit_amqqueue, :declare,
+ [queue_name, durable, auto_delete, args, owner, "acting-user"])
+ end
+
+ def delete_queue(name, vhost) do
+ queue_name = :rabbit_misc.r(vhost, :queue, name)
+ :rpc.call(get_rabbit_hostname(),
+ :rabbit_amqqueue, :delete,
+ [queue_name, false, false, "acting-user"])
+ end
+
+ def lookup_queue(name, vhost) do
+ queue_name = :rabbit_misc.r(vhost, :queue, name)
+ :rpc.call(get_rabbit_hostname(),
+ :rabbit_amqqueue, :lookup,
+ [queue_name])
+ end
+
+ def declare_exchange(name, vhost, type \\ :direct, durable \\ false, auto_delete \\ false, internal \\ false, args \\ []) do
+ exchange_name = :rabbit_misc.r(vhost, :exchange, name)
+ :rpc.call(get_rabbit_hostname(),
+ :rabbit_exchange, :declare,
+ [exchange_name, type, durable, auto_delete, internal, args, "acting-user"])
+ end
+
+ def list_permissions(vhost) do
+ :rpc.call(
+ get_rabbit_hostname(),
+ :rabbit_auth_backend_internal,
+ :list_vhost_permissions,
+ [vhost],
+ :infinity
+ )
+ end
+
+ def set_topic_permissions(user, vhost, exchange, writePerm, readPerm) do
+ :rpc.call(
+ get_rabbit_hostname(),
+ :rabbit_auth_backend_internal,
+ :set_topic_permissions,
+ [user, vhost, exchange, writePerm, readPerm, "acting-user"],
+ :infinity
+ )
+ end
+
+ def list_user_topic_permissions(user) do
+ :rpc.call(
+ get_rabbit_hostname(),
+ :rabbit_auth_backend_internal,
+ :list_user_topic_permissions,
+ [user],
+ :infinity
+ )
+ end
+
+ def clear_topic_permissions(user, vhost) do
+ :rpc.call(
+ get_rabbit_hostname(),
+ :rabbit_auth_backend_internal,
+ :clear_topic_permissions,
+ [user, vhost, "acting-user"],
+ :infinity
+ )
+ end
+
+ def set_vm_memory_high_watermark(limit) do
+ :rpc.call(get_rabbit_hostname(), :vm_memory_monitor, :set_vm_memory_high_watermark, [limit])
+ end
+
+ def set_disk_free_limit(limit) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_disk_monitor, :set_disk_free_limit, [limit])
+ end
+
+
+ #
+ # App lifecycle
+ #
+
+ def await_rabbitmq_startup() do
+ await_rabbitmq_startup_with_retries(100)
+ end
+
+ def await_rabbitmq_startup_with_retries(0) do
+ throw({:error, "Failed to call rabbit.await_startup/0 with retries: node #{get_rabbit_hostname()} was down"})
+ end
+ def await_rabbitmq_startup_with_retries(retries_left) do
+ case :rabbit_misc.rpc_call(get_rabbit_hostname(), :rabbit, :await_startup, []) do
+ :ok ->
+ :ok
+ {:badrpc, :nodedown} ->
+ :timer.sleep(50)
+ await_rabbitmq_startup_with_retries(retries_left - 1)
+ end
+ end
+
+ def await_condition(fun, timeout) do
+ retries = Integer.floor_div(timeout, 50)
+ await_condition_with_retries(fun, retries)
+ end
+
+ def await_condition_with_retries(_fun, 0) do
+ throw({:error, "Condition did not materialize"})
+ end
+ def await_condition_with_retries(fun, retries_left) do
+ case fun.() do
+ true -> :ok
+ _ ->
+ :timer.sleep(50)
+ await_condition_with_retries(fun, retries_left - 1)
+ end
+ end
+
+ def is_rabbitmq_app_running() do
+ :rabbit_misc.rpc_call(get_rabbit_hostname(), :rabbit, :is_booted, [])
+ end
+
+ def start_rabbitmq_app do
+ :rabbit_misc.rpc_call(get_rabbit_hostname(), :rabbit, :start, [])
+ await_rabbitmq_startup()
+ :timer.sleep(250)
+ end
+
+ def stop_rabbitmq_app do
+ :rabbit_misc.rpc_call(get_rabbit_hostname(), :rabbit, :stop, [])
+ :timer.sleep(1200)
+ end
+
+ def drain_node() do
+ :rpc.call(get_rabbit_hostname(), :rabbit_maintenance, :drain, [])
+ end
+
+ def revive_node() do
+ :rpc.call(get_rabbit_hostname(), :rabbit_maintenance, :revive, [])
+ end
+
+ def is_draining_node() do
+ node = get_rabbit_hostname()
+ :rpc.call(node, :rabbit_maintenance, :is_being_drained_local_read, [node])
+ end
+
+ def status do
+ :rpc.call(get_rabbit_hostname(), :rabbit, :status, [])
+ end
+
+ def error_check(cmd_line, code) do
+ assert catch_exit(RabbitMQCtl.main(cmd_line)) == {:shutdown, code}
+ end
+
+ def with_channel(vhost, fun) do
+ with_connection(vhost,
+ fn(conn) ->
+ {:ok, chan} = AMQP.Channel.open(conn)
+ AMQP.Confirm.select(chan)
+ fun.(chan)
+ end)
+ end
+
+ def with_connection(vhost, fun) do
+ Application.ensure_all_started(:amqp)
+ {:ok, conn} = AMQP.Connection.open(virtual_host: vhost)
+ ExUnit.Callbacks.on_exit(fn ->
+ try do
+ :amqp_connection.close(conn, 1000)
+ catch
+ :exit, _ -> :ok
+ end
+ end)
+ fun.(conn)
+ end
+
+ def with_connections(vhosts, fun) do
+ Application.ensure_all_started(:amqp)
+ conns = for v <- vhosts do
+ {:ok, conn} = AMQP.Connection.open(virtual_host: v)
+ conn
+ end
+ ExUnit.Callbacks.on_exit(fn ->
+ try do
+ for c <- conns, do: :amqp_connection.close(c, 1000)
+ catch
+ :exit, _ -> :ok
+ end
+ end)
+ fun.(conns)
+ end
+
+ def message_count(vhost, queue_name) do
+ with_channel(vhost, fn(channel) ->
+ {:ok, %{message_count: mc}} = AMQP.Queue.declare(channel, queue_name)
+ mc
+ end)
+ end
+
+ def publish_messages(vhost, queue_name, count) do
+ with_channel(vhost, fn(channel) ->
+ AMQP.Queue.purge(channel, queue_name)
+ for i <- 1..count do
+ AMQP.Basic.publish(channel, "", queue_name,
+ "test_message" <> Integer.to_string(i))
+ end
+ AMQP.Confirm.wait_for_confirms(channel, 30)
+ end)
+ end
+
+ def await_no_client_connections(node, timeout) do
+ iterations = timeout / 10
+ await_no_client_connections_with_iterations(node, iterations)
+ end
+
+ def await_no_client_connections_with_iterations(_node, n) when n <= 0 do
+ flunk "Ran out of retries, still have active client connections"
+ end
+ def await_no_client_connections_with_iterations(node, n) when n > 0 do
+ case :rpc.call(node, :rabbit_networking, :connections_local, []) do
+ [] -> :ok
+ _xs ->
+ :timer.sleep(10)
+ await_no_client_connections_with_iterations(node, n - 1)
+ end
+ end
+
+ def close_all_connections(node) do
+ # we intentionally use connections_local/0 here because connections/0,
+ # the cluster-wide version, loads some bits around cluster membership
+ # that are not normally ready with a single node. MK.
+ #
+ # when/if we decide to test
+ # this project against a cluster of nodes this will need revisiting. MK.
+ for pid <- :rpc.call(node, :rabbit_networking, :connections_local, []) do
+ :rpc.call(node, :rabbit_networking, :close_connection, [pid, :force_closed])
+ end
+ await_no_client_connections(node, 5000)
+ end
+
+ def expect_client_connection_failure() do
+ expect_client_connection_failure("/")
+ end
+ def expect_client_connection_failure(vhost) do
+ Application.ensure_all_started(:amqp)
+ assert {:error, :econnrefused} == AMQP.Connection.open(virtual_host: vhost)
+ end
+
+ def delete_all_queues() do
+ try do
+ immediately_delete_all_queues(:rabbit_amqqueue.list())
+ catch
+ _, _ -> :ok
+ end
+ end
+
+ def delete_all_queues(vhost) do
+ try do
+ immediately_delete_all_queues(:rabbit_amqqueue.list(vhost))
+ catch
+ _, _ -> :ok
+ end
+ end
+
+ defp immediately_delete_all_queues(qs) do
+ for q <- qs do
+ try do
+ :rpc.call(
+ get_rabbit_hostname(),
+ :rabbit_amqeueue,
+ :delete,
+ [q, false, false],
+ 5000
+ )
+ catch
+ _, _ -> :ok
+ end
+ end
+ end
+
+ def reset_vm_memory_high_watermark() do
+ try do
+ :rpc.call(
+ get_rabbit_hostname(),
+ :vm_memory_monitor,
+ :set_vm_memory_high_watermark,
+ [0.4],
+ 5000
+ )
+ catch
+ _, _ -> :ok
+ end
+ end
+
+ def emit_list_multiple_sources(list1, list2, ref, pid) do
+ pids = for list <- [list1, list2], do: Kernel.spawn_link(TestHelper, :emit_list, [list, ref, pid])
+ :rabbit_control_misc.await_emitters_termination(pids)
+ end
+
+ def emit_list(list, ref, pid) do
+ emit_list_map(list, &(&1), ref, pid)
+ end
+
+ def emit_list_map(list, fun, ref, pid) do
+ :rabbit_control_misc.emitting_map(pid, ref, fun, list)
+ end
+
+ def run_command_to_list(command, args) do
+ res = Kernel.apply(command, :run, args)
+ case Enumerable.impl_for(res) do
+ nil -> res;
+ _ -> Enum.to_list(res)
+ end
+ end
+
+ def vhost_exists?(vhost) do
+ Enum.any?(list_vhosts(), fn(v) -> v[:name] == vhost end)
+ end
+
+ def set_enabled_plugins(plugins, mode, node, opts) do
+ {:ok, enabled} = PluginHelpers.set_enabled_plugins(plugins, opts)
+
+ PluginHelpers.update_enabled_plugins(enabled, mode, node, opts)
+ end
+
+ def currently_active_plugins(context) do
+ Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))
+ end
+
+ def enable_federation_plugin() do
+ node = get_rabbit_hostname()
+ {:ok, plugins_file} = :rabbit_misc.rpc_call(node,
+ :application, :get_env,
+ [:rabbit, :enabled_plugins_file])
+ {:ok, plugins_dir} = :rabbit_misc.rpc_call(node,
+ :application, :get_env,
+ [:rabbit, :plugins_dir])
+ rabbitmq_home = :rabbit_misc.rpc_call(node, :code, :lib_dir, [:rabbit])
+ {:ok, [_enabled_plugins]} = :file.consult(plugins_file)
+
+ opts = %{enabled_plugins_file: plugins_file,
+ plugins_dir: plugins_dir,
+ rabbitmq_home: rabbitmq_home,
+ online: true, offline: false}
+
+ plugins = currently_active_plugins(%{opts: %{node: node}})
+ case Enum.member?(plugins, :rabbitmq_federation) do
+ true -> :ok
+ false ->
+ set_enabled_plugins(plugins ++ [:rabbitmq_federation], :online, get_rabbit_hostname(), opts)
+ end
+ end
+
+ def set_vhost_limits(vhost, limits) do
+ :rpc.call(get_rabbit_hostname(),
+ :rabbit_vhost_limit, :parse_set, [vhost, limits, <<"acting-user">>])
+ end
+ def get_vhost_limits(vhost) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_vhost_limit, :list, [vhost])
+ |> Map.new
+ end
+
+ def clear_vhost_limits(vhost) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_vhost_limit, :clear, [vhost, <<"acting-user">>])
+ end
+
+ def resume_all_client_listeners() do
+ :rpc.call(get_rabbit_hostname(), :rabbit_maintenance, :resume_all_client_listeners, [])
+ end
+
+ def suspend_all_client_listeners() do
+ :rpc.call(get_rabbit_hostname(), :rabbit_maintenance, :suspend_all_client_listeners, [])
+ end
+
+ def set_user_limits(user, limits) do
+ :rpc.call(get_rabbit_hostname(),
+ :rabbit_auth_backend_internal, :set_user_limits, [user, limits, <<"acting-user">>])
+ end
+
+ def get_user_limits(user) do
+ :rpc.call(get_rabbit_hostname(), :rabbit_auth_backend_internal, :get_user_limits, [user])
+ |> Map.new
+ end
+
+ def clear_user_limits(user) do
+ clear_user_limits user, "max-connections"
+ clear_user_limits user, "max-channels"
+ end
+
+ def clear_user_limits(user, limittype) do
+ :rpc.call(get_rabbit_hostname(),
+ :rabbit_auth_backend_internal, :clear_user_limits, [user, limittype, <<"acting-user">>])
+ end
+
+ def set_scope(scope) do
+ script_name = Config.get_option(:script_name, %{})
+ scopes = Keyword.put(Application.get_env(:rabbitmqctl, :scopes), script_name, scope)
+ Application.put_env(:rabbitmqctl, :scopes, scopes)
+ CommandModules.load(%{})
+ end
+
+ def switch_plugins_directories(old_value, new_value) do
+ :rabbit_misc.rpc_call(get_rabbit_hostname(), :application, :set_env,
+ [:rabbit, :plugins_dir, new_value])
+ ExUnit.Callbacks.on_exit(fn ->
+ :rabbit_misc.rpc_call(get_rabbit_hostname(), :application, :set_env,
+ [:rabbit, :plugins_dir, old_value])
+ end)
+ end
+
+ def get_opts_with_non_existing_plugins_directory(context) do
+ get_opts_with_plugins_directories(context, ["/tmp/non_existing_rabbitmq_dummy_plugins"])
+ end
+
+ def get_opts_with_plugins_directories(context, plugins_directories) do
+ opts = context[:opts]
+ plugins_dir = opts[:plugins_dir]
+ all_directories = Enum.join([to_string(plugins_dir) | plugins_directories], path_separator())
+ %{opts | plugins_dir: to_charlist(all_directories)}
+ end
+
+ def get_opts_with_existing_plugins_directory(context) do
+ extra_plugin_directory = System.tmp_dir!() |> Path.join("existing_rabbitmq_dummy_plugins")
+ File.mkdir!(extra_plugin_directory)
+ ExUnit.Callbacks.on_exit(fn ->
+ File.rm_rf(extra_plugin_directory)
+ end)
+ get_opts_with_plugins_directories(context, [extra_plugin_directory])
+ end
+
+ def check_plugins_enabled(plugins, context) do
+ {:ok, [xs]} = :file.consult(context[:opts][:enabled_plugins_file])
+ assert_equal_sets(plugins, xs)
+ end
+
+ def assert_equal_sets(a, b) do
+ asorted = Enum.sort(a)
+ bsorted = Enum.sort(b)
+ assert asorted == bsorted
+ end
+
+ def assert_stream_without_errors(stream) do
+ true = Enum.all?(stream, fn({:error, _}) -> false;
+ ({:error, _, _}) -> false;
+ (_) -> true end)
+ end
+
+ def wait_for_log_message(message, file \\ nil, attempts \\ 100) do
+ ## Assume default log is the first one
+ log_file = case file do
+ nil ->
+ [default_log | _] = :rpc.call(get_rabbit_hostname(), :rabbit_lager, :log_locations, [])
+ default_log
+ _ -> file
+ end
+ case File.read(log_file) do
+ {:ok, data} ->
+ case String.match?(data, Regex.compile!(message)) do
+ true -> :ok
+ false ->
+ :timer.sleep(100)
+ wait_for_log_message(message, log_file, attempts - 1)
+ end
+ _ ->
+ :timer.sleep(100)
+ wait_for_log_message(message, log_file, attempts - 1)
+ end
+ end
+end