diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2016-12-27 16:36:08 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2016-12-27 16:36:08 +0100 |
| commit | 6a016d5b5db291ab0a99edbc794fc7ab45299c24 (patch) | |
| tree | a0259dc7303308073d6db3aa5c1e52dd58d252dd /test | |
| parent | 6afabee7738c5e56ebf9638101f11b6be728afaf (diff) | |
| parent | 30ebac8cc103cf032d5e7bdd1b5f74407f4b986f (diff) | |
| download | rabbitmq-server-git-6a016d5b5db291ab0a99edbc794fc7ab45299c24.tar.gz | |
Merge branch 'master' into rabbitmq-server-505
Diffstat (limited to 'test')
| -rw-r--r-- | test/channel_operation_timeout_test_queue.erl | 75 | ||||
| -rw-r--r-- | test/clustering_management_SUITE.erl | 28 | ||||
| -rw-r--r-- | test/eager_sync_SUITE.erl | 18 | ||||
| -rw-r--r-- | test/per_vhost_connection_limit_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/per_vhost_msg_store_SUITE.erl | 254 | ||||
| -rw-r--r-- | test/plugins_SUITE.erl | 80 | ||||
| -rw-r--r-- | test/plugins_SUITE_data/plugins1/mock_rabbitmq_plugins_01-0.1.0.ez | bin | 3280 -> 0 bytes | |||
| -rw-r--r-- | test/rabbitmqctl_integration_SUITE.erl | 86 | ||||
| -rw-r--r-- | test/sync_detection_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/unit_inbroker_SUITE.erl | 880 |
10 files changed, 417 insertions, 1008 deletions
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl index 4407a24e7f..124fda47b1 100644 --- a/test/channel_operation_timeout_test_queue.erl +++ b/test/channel_operation_timeout_test_queue.erl @@ -111,8 +111,8 @@ }). -define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost). +-define(TRANSIENT_MSG_STORE, msg_store_transient_vhost). -define(QUEUE, lqueue). -define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>). @@ -215,14 +215,27 @@ start(DurableQueues) -> {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), - start_msg_store( - [Ref || Terms <- AllTerms, - Terms /= non_clean_shutdown, - begin - Ref = proplists:get_value(persistent_ref, Terms), - Ref =/= undefined - end], - StartFunState), + %% Group recovery terms by vhost. + {[], VhostRefs} = lists:foldl( + fun + %% We need to skip a queue name + (non_clean_shutdown, {[_|QNames], VhostRefs}) -> + {QNames, VhostRefs}; + (Terms, {[QueueName | QNames], VhostRefs}) -> + case proplists:get_value(persistent_ref, Terms) of + undefined -> {QNames, VhostRefs}; + Ref -> + #resource{virtual_host = VHost} = QueueName, + Refs = case maps:find(VHost, VhostRefs) of + {ok, Val} -> Val; + error -> [] + end, + {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)} + end + end, + {DurableQueues, #{}}, + AllTerms), + start_msg_store(VhostRefs, StartFunState), {ok, AllTerms}. stop() -> @@ -230,12 +243,21 @@ stop() -> ok = rabbit_queue_index:stop(). start_msg_store(Refs, StartFunState) -> - ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, + ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup, [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined, {fun (ok) -> finished end, ok}]), - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store, + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup, [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), - Refs, StartFunState]). + Refs, StartFunState]), + %% Start message store for all known vhosts + VHosts = rabbit_vhost:list(), + lists:foreach( + fun(VHost) -> + rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost), + rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost) + end, + VHosts), + ok. stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), @@ -254,22 +276,26 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), + VHost = QueueName#resource.virtual_host, init(IsDurable, IndexState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, - MsgOnDiskFun, AsyncCallback); + MsgOnDiskFun, AsyncCallback, + VHost); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost)); %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> {PRef, RecoveryTerms} = process_recovery_terms(Terms), + VHost = QueueName#resource.virtual_host, {PersistentClient, ContainsCheckFun} = case IsDurable of true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun, AsyncCallback), + MsgOnDiskFun, AsyncCallback, + VHost), {C, fun (MsgId) when is_binary(MsgId) -> rabbit_msg_store:contains(MsgId, C); (#basic_message{is_persistent = Persistent}) -> @@ -278,11 +304,12 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, false -> {undefined, fun(_MsgId) -> false end} end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, - undefined, AsyncCallback), + undefined, AsyncCallback, + VHost), {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, - rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), + rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost), ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). @@ -957,14 +984,16 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> end), Res. -msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> +msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun, - Callback). + Callback, VHost). -msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> +msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun, - fun () -> Callback(?MODULE, CloseFDsFun) end). + rabbit_msg_store_vhost_sup:client_init( + MsgStore, Ref, MsgOnDiskFun, + fun () -> Callback(?MODULE, CloseFDsFun) end, + VHost). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index eac5fa3683..9b605d07ae 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -644,6 +644,7 @@ assert_not_clustered(Node) -> assert_failure(Fun) -> case catch Fun() of + {error, Code, Reason} -> Reason; {error, Reason} -> Reason; {error_string, Reason} -> Reason; {badrpc, {'EXIT', Reason}} -> Reason; @@ -652,35 +653,35 @@ assert_failure(Fun) -> end. stop_app(Node) -> - control_action(stop_app, Node). + rabbit_control_helper:command(stop_app, Node). start_app(Node) -> - control_action(start_app, Node). + rabbit_control_helper:command(start_app, Node). join_cluster(Node, To) -> join_cluster(Node, To, false). join_cluster(Node, To, Ram) -> - control_action(join_cluster, Node, [atom_to_list(To)], [{"--ram", Ram}]). + rabbit_control_helper:command_with_output(join_cluster, Node, [atom_to_list(To)], [{"--ram", Ram}]). reset(Node) -> - control_action(reset, Node). + rabbit_control_helper:command(reset, Node). force_reset(Node) -> - control_action(force_reset, Node). + rabbit_control_helper:command(force_reset, Node). forget_cluster_node(Node, Removee, RemoveWhenOffline) -> - control_action(forget_cluster_node, Node, [atom_to_list(Removee)], + rabbit_control_helper:command(forget_cluster_node, Node, [atom_to_list(Removee)], [{"--offline", RemoveWhenOffline}]). forget_cluster_node(Node, Removee) -> forget_cluster_node(Node, Removee, false). change_cluster_node_type(Node, Type) -> - control_action(change_cluster_node_type, Node, [atom_to_list(Type)]). + rabbit_control_helper:command(change_cluster_node_type, Node, [atom_to_list(Type)]). update_cluster_nodes(Node, DiscoveryNode) -> - control_action(update_cluster_nodes, Node, [atom_to_list(DiscoveryNode)]). + rabbit_control_helper:command(update_cluster_nodes, Node, [atom_to_list(DiscoveryNode)]). stop_join_start(Node, ClusterTo, Ram) -> ok = stop_app(Node), @@ -695,17 +696,6 @@ stop_reset_start(Node) -> ok = reset(Node), ok = start_app(Node). -control_action(Command, Node) -> - control_action(Command, Node, [], []). - -control_action(Command, Node, Args) -> - control_action(Command, Node, Args, []). - -control_action(Command, Node, Args, Opts) -> - rpc:call(Node, rabbit_control_main, action, - [Command, Node, Args, Opts, - fun io:format/2]). - declare(Ch, Name) -> Res = amqp_channel:call(Ch, #'queue.declare'{durable = true, queue = Name}), diff --git a/test/eager_sync_SUITE.erl b/test/eager_sync_SUITE.erl index 93b308b6c5..70d7080269 100644 --- a/test/eager_sync_SUITE.erl +++ b/test/eager_sync_SUITE.erl @@ -23,7 +23,7 @@ -define(QNAME, <<"ha.two.test">>). -define(QNAME_AUTO, <<"ha.auto.test">>). --define(MESSAGE_COUNT, 2000). +-define(MESSAGE_COUNT, 200000). all() -> [ @@ -135,9 +135,11 @@ eager_sync_cancel(Config) -> amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME, durable = true}), {ok, not_syncing} = sync_cancel(C, ?QNAME), %% Idempotence - eager_sync_cancel_test2(Config, A, B, C, Ch). + eager_sync_cancel_test2(Config, A, B, C, Ch, 100). -eager_sync_cancel_test2(Config, A, B, C, Ch) -> +eager_sync_cancel_test2(_, _, _, _, _, 0) -> + error(no_more_attempts_left); +eager_sync_cancel_test2(Config, A, B, C, Ch, Attempts) -> %% Sync then cancel rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT), restart(Config, A), @@ -158,12 +160,12 @@ eager_sync_cancel_test2(Config, A, B, C, Ch) -> %% Damn. Syncing finished between wait_for_syncing/3 and %% sync_cancel/2 above. Start again. amqp_channel:call(Ch, #'queue.purge'{queue = ?QNAME}), - eager_sync_cancel_test2(Config, A, B, C, Ch) + eager_sync_cancel_test2(Config, A, B, C, Ch, Attempts - 1) end; synced_already -> %% Damn. Syncing finished before wait_for_syncing/3. Start again. amqp_channel:call(Ch, #'queue.purge'{queue = ?QNAME}), - eager_sync_cancel_test2(Config, A, B, C, Ch) + eager_sync_cancel_test2(Config, A, B, C, Ch, Attempts - 1) end. eager_sync_auto(Config) -> @@ -240,8 +242,8 @@ wait_for_sync(Node, QName) -> sync_detection_SUITE:wait_for_sync_status(true, Node, QName). action(Node, Action, QName) -> - rabbit_ct_broker_helpers:control_action( - Action, Node, [binary_to_list(QName)], [{"-p", "/"}]). + rabbit_control_helper:command_with_output( + Action, Node, [binary_to_list(QName)], [{"-p", "/"}]). queue(Node, QName) -> QNameRes = rabbit_misc:r(<<"/">>, queue, QName), @@ -273,6 +275,6 @@ state(Node, QName) -> %% in order to pass, because a SyncBatchSize >= ?MESSAGE_COUNT will %% always finish before the test is able to cancel the sync. set_app_sync_batch_size(Node) -> - rabbit_ct_broker_helpers:control_action( + rabbit_control_helper:command( eval, Node, ["application:set_env(rabbit, mirroring_sync_batch_size, 1)."]). diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl index 592e57c41a..efc5ca830e 100644 --- a/test/per_vhost_connection_limit_SUITE.erl +++ b/test/per_vhost_connection_limit_SUITE.erl @@ -795,7 +795,7 @@ set_vhost_connection_limit(Config, VHost, Count) -> set_vhost_connection_limit(Config, NodeIndex, VHost, Count) -> Node = rabbit_ct_broker_helpers:get_node_config( Config, NodeIndex, nodename), - rabbit_ct_broker_helpers:control_action( + ok = rabbit_ct_broker_helpers:control_action( set_vhost_limits, Node, ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"], [{"-p", binary_to_list(VHost)}]). diff --git a/test/per_vhost_msg_store_SUITE.erl b/test/per_vhost_msg_store_SUITE.erl new file mode 100644 index 0000000000..4d88c84b7e --- /dev/null +++ b/test/per_vhost_msg_store_SUITE.erl @@ -0,0 +1,254 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(per_vhost_msg_store_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +-define(MSGS_COUNT, 100). + +all() -> + [ + publish_to_different_dirs, + storage_deleted_on_vhost_delete, + single_vhost_storage_delete_is_safe + ]. + + + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config( + Config, + [{rmq_nodename_suffix, ?MODULE}]), + Config2 = rabbit_ct_helpers:merge_app_env( + Config1, + {rabbit, [{queue_index_embed_msgs_below, 100}]}), + rabbit_ct_helpers:run_setup_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(_, Config) -> + Vhost1 = <<"vhost1">>, + Vhost2 = <<"vhost2">>, + rabbit_ct_broker_helpers:add_vhost(Config, Vhost1), + rabbit_ct_broker_helpers:add_vhost(Config, Vhost2), + Chan1 = open_channel(Vhost1, Config), + Chan2 = open_channel(Vhost2, Config), + rabbit_ct_helpers:set_config( + Config, + [{vhost1, Vhost1}, {vhost2, Vhost2}, + {channel1, Chan1}, {channel2, Chan2}]). + +end_per_testcase(single_vhost_storage_delete_is_safe, Config) -> + Config; +end_per_testcase(_, Config) -> + Vhost1 = ?config(vhost1, Config), + Vhost2 = ?config(vhost2, Config), + rabbit_ct_broker_helpers:delete_vhost(Config, Vhost1), + rabbit_ct_broker_helpers:delete_vhost(Config, Vhost2), + Config. + +publish_to_different_dirs(Config) -> + Vhost1 = ?config(vhost1, Config), + Vhost2 = ?config(vhost2, Config), + Channel1 = ?config(channel1, Config), + Channel2 = ?config(channel2, Config), + Queue1 = declare_durable_queue(Channel1), + Queue2 = declare_durable_queue(Channel2), + FolderSize1 = get_folder_size(Vhost1, Config), + FolderSize2 = get_folder_size(Vhost2, Config), + + %% Publish message to a queue index + publish_persistent_messages(index, Channel1, Queue1), + %% First storage increased + FolderSize11 = get_folder_size(Vhost1, Config), + true = (FolderSize1 < FolderSize11), + %% Second storage didn't increased + FolderSize2 = get_folder_size(Vhost2, Config), + + %% Publish message to a message store + publish_persistent_messages(store, Channel1, Queue1), + %% First storage increased + FolderSize12 = get_folder_size(Vhost1, Config), + true = (FolderSize11 < FolderSize12), + %% Second storage didn't increased + FolderSize2 = get_folder_size(Vhost2, Config), + + %% Publish message to a queue index + publish_persistent_messages(index, Channel2, Queue2), + %% First storage increased + FolderSize21 = get_folder_size(Vhost2, Config), + true = (FolderSize2 < FolderSize21), + %% Second storage didn't increased + FolderSize12 = get_folder_size(Vhost1, Config), + + %% Publish message to a message store + publish_persistent_messages(store, Channel2, Queue2), + %% Second storage increased + FolderSize22 = get_folder_size(Vhost2, Config), + true = (FolderSize21 < FolderSize22), + %% First storage didn't increased + FolderSize12 = get_folder_size(Vhost1, Config). + +storage_deleted_on_vhost_delete(Config) -> + Vhost1 = ?config(vhost1, Config), + Channel1 = ?config(channel1, Config), + Queue1 = declare_durable_queue(Channel1), + FolderSize = get_global_folder_size(Config), + + publish_persistent_messages(index, Channel1, Queue1), + publish_persistent_messages(store, Channel1, Queue1), + FolderSizeAfterPublish = get_global_folder_size(Config), + + %% Total storage size increased + true = (FolderSize < FolderSizeAfterPublish), + + ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost1), + + %% Total memory reduced + FolderSizeAfterDelete = get_global_folder_size(Config), + true = (FolderSizeAfterPublish > FolderSizeAfterDelete), + + %% There is no Vhost1 folder + 0 = get_folder_size(Vhost1, Config). + + +single_vhost_storage_delete_is_safe(Config) -> +ct:pal("Start test 3", []), + Vhost1 = ?config(vhost1, Config), + Vhost2 = ?config(vhost2, Config), + Channel1 = ?config(channel1, Config), + Channel2 = ?config(channel2, Config), + Queue1 = declare_durable_queue(Channel1), + Queue2 = declare_durable_queue(Channel2), + + %% Publish messages to both stores + publish_persistent_messages(index, Channel1, Queue1), + publish_persistent_messages(store, Channel1, Queue1), + publish_persistent_messages(index, Channel2, Queue2), + publish_persistent_messages(store, Channel2, Queue2), + + queue_is_not_empty(Channel2, Queue2), + % Vhost2Dir = vhost_dir(Vhost2, Config), + % [StoreFile] = filelib:wildcard(binary_to_list(filename:join([Vhost2Dir, "msg_store_persistent_*", "0.rdq"]))), + % ct:pal("Store file ~p~n", [file:read_file(StoreFile)]). +% ok. + rabbit_ct_broker_helpers:stop_broker(Config, 0), + delete_vhost_data(Vhost1, Config), + rabbit_ct_broker_helpers:start_broker(Config, 0), + + Channel11 = open_channel(Vhost1, Config), + Channel21 = open_channel(Vhost2, Config), + + %% There are no Vhost1 messages + queue_is_empty(Channel11, Queue1), + + %% But Vhost2 messages are in place + queue_is_not_empty(Channel21, Queue2), + consume_messages(index, Channel21, Queue2), + consume_messages(store, Channel21, Queue2). + +declare_durable_queue(Channel) -> + QName = list_to_binary(erlang:ref_to_list(make_ref())), + #'queue.declare_ok'{queue = QName} = + amqp_channel:call(Channel, + #'queue.declare'{queue = QName, durable = true}), + QName. + +publish_persistent_messages(Storage, Channel, Queue) -> + MessagePayload = case Storage of + index -> binary:copy(<<"=">>, 50); + store -> binary:copy(<<"-">>, 150) + end, + amqp_channel:call(Channel, #'confirm.select'{}), + [amqp_channel:call(Channel, + #'basic.publish'{routing_key = Queue}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = MessagePayload}) + || _ <- lists:seq(1, ?MSGS_COUNT)], + amqp_channel:wait_for_confirms(Channel). + + +get_folder_size(Vhost, Config) -> + Dir = vhost_dir(Vhost, Config), + folder_size(Dir). + +folder_size(Dir) -> + filelib:fold_files(Dir, ".*", true, + fun(F,Acc) -> filelib:file_size(F) + Acc end, 0). + +get_global_folder_size(Config) -> + BaseDir = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_mnesia, dir, []), + folder_size(BaseDir). + +vhost_dir(Vhost, Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_vhost, msg_store_dir_path, [Vhost]). + +delete_vhost_data(Vhost, Config) -> + Dir = vhost_dir(Vhost, Config), + rabbit_file:recursive_delete([Dir]). + +queue_is_empty(Channel, Queue) -> + #'queue.declare_ok'{queue = Queue, message_count = 0} = + amqp_channel:call(Channel, + #'queue.declare'{ queue = Queue, + durable = true, + passive = true}). + +queue_is_not_empty(Channel, Queue) -> + #'queue.declare_ok'{queue = Queue, message_count = MsgCount} = + amqp_channel:call(Channel, + #'queue.declare'{ queue = Queue, + durable = true, + passive = true}), + ExpectedCount = ?MSGS_COUNT * 2, + ExpectedCount = MsgCount. + +consume_messages(Storage, Channel, Queue) -> + MessagePayload = case Storage of + index -> binary:copy(<<"=">>, 50); + store -> binary:copy(<<"-">>, 150) + end, + lists:foreach( + fun(I) -> + ct:pal("Consume message ~p~n ~p~n", [I, MessagePayload]), + {#'basic.get_ok'{}, Content} = + amqp_channel:call(Channel, + #'basic.get'{queue = Queue, + no_ack = true}), + #amqp_msg{payload = MessagePayload} = Content + end, + lists:seq(1, ?MSGS_COUNT)), + ok. + +open_channel(Vhost, Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + {ok, Conn} = amqp_connection:start( + #amqp_params_direct{node = Node, virtual_host = Vhost}), + {ok, Chan} = amqp_connection:open_channel(Conn), + Chan. diff --git a/test/plugins_SUITE.erl b/test/plugins_SUITE.erl deleted file mode 100644 index 8896298df1..0000000000 --- a/test/plugins_SUITE.erl +++ /dev/null @@ -1,80 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2016 Pivotal Software, Inc. All rights reserved. -%% - --module(plugins_SUITE). - --include_lib("common_test/include/ct.hrl"). - --compile(export_all). - -all() -> - [ - active_with_single_plugin_dir, - active_with_multiple_plugin_dirs - ]. - -%% ------------------------------------------------------------------- -%% Testsuite setup/teardown. -%% ------------------------------------------------------------------- - -init_per_suite(Config) -> - rabbit_ct_helpers:log_environment(), - application:load(rabbit), - rabbit_ct_helpers:run_setup_steps(Config). - -end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config). - -init_per_group(_, Config) -> - Config. - -end_per_group(_, Config) -> - Config. - -init_per_testcase(Testcase, Config) -> - rabbit_ct_helpers:testcase_started(Config, Testcase). - -end_per_testcase(Testcase, Config) -> - rabbit_ct_helpers:testcase_finished(Config, Testcase). - -%% ------------------------------------------------------------------- -%% Testcases. -%% ------------------------------------------------------------------- - -active_with_single_plugin_dir(Config) -> - DataDir = rabbit_ct_helpers:get_config(Config, data_dir), - PluginsDir1 = filename:join(DataDir, "plugins1"), - - true = code:add_path(filename:join([PluginsDir1, - "mock_rabbitmq_plugins_01-0.1.0.ez", - "mock_rabbitmq_plugins_01-0.1.0", "ebin"])), - {ok, _} = application:ensure_all_started(mock_rabbitmq_plugins_01), - application:set_env(rabbit, plugins_dir, PluginsDir1), - - [mock_rabbitmq_plugins_01] = rabbit_plugins:active(). - -active_with_multiple_plugin_dirs(Config) -> - DataDir = rabbit_ct_helpers:get_config(Config, data_dir), - PluginsDir1 = filename:join(DataDir, "plugins1"), - PluginsDir2 = filename:join(DataDir, "plugins2"), - - true = code:add_path(filename:join([PluginsDir1, - "mock_rabbitmq_plugins_01-0.1.0.ez", - "mock_rabbitmq_plugins_01-0.1.0", "ebin"])), - {ok, _} = application:ensure_all_started(mock_rabbitmq_plugins_01), - application:set_env(rabbit, plugins_dir, PluginsDir1 ++ ":" ++ PluginsDir2), - - [mock_rabbitmq_plugins_01] = rabbit_plugins:active(). diff --git a/test/plugins_SUITE_data/plugins1/mock_rabbitmq_plugins_01-0.1.0.ez b/test/plugins_SUITE_data/plugins1/mock_rabbitmq_plugins_01-0.1.0.ez Binary files differdeleted file mode 100644 index 40cba9f16b..0000000000 --- a/test/plugins_SUITE_data/plugins1/mock_rabbitmq_plugins_01-0.1.0.ez +++ /dev/null diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl index ef85472f48..bbc4447102 100644 --- a/test/rabbitmqctl_integration_SUITE.erl +++ b/test/rabbitmqctl_integration_SUITE.erl @@ -31,7 +31,6 @@ -export([list_queues_local/1 ,list_queues_offline/1 ,list_queues_online/1 - ,manage_global_parameters/1 ]). all() -> @@ -46,8 +45,7 @@ groups() -> [list_queues_local ,list_queues_online ,list_queues_offline - ]}, - {global_parameters, [], [manage_global_parameters]} + ]} ]. init_per_suite(Config) -> @@ -63,13 +61,6 @@ init_per_group(list_queues, Config0) -> Config1 = declare_some_queues(Config), rabbit_ct_broker_helpers:stop_node(Config1, NumNodes - 1), Config1; -init_per_group(global_parameters,Config) -> - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, ?MODULE} - ]), - rabbit_ct_helpers:run_setup_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()); init_per_group(_, Config) -> Config. @@ -144,75 +135,6 @@ list_queues_offline(Config) -> assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues), ok. -manage_global_parameters(Config) -> - 0 = length(global_parameters(Config)), - Parameter1Key = global_param1, - GlobalParameter1ValueAsString = "{\"a\":\"b\", \"c\":\"d\"}", - ok = control_action(Config, set_global_parameter, - [atom_to_list(Parameter1Key), - GlobalParameter1ValueAsString - ]), - - 1 = length(global_parameters(Config)), - - GlobalParameter1Value = rabbit_ct_broker_helpers:rpc( - Config, 0, - rabbit_runtime_parameters, value_global, - [Parameter1Key] - ), - - [{<<"a">>,<<"b">>}, {<<"c">>,<<"d">>}] = GlobalParameter1Value, - - Parameter2Key = global_param2, - GlobalParameter2ValueAsString = "{\"e\":\"f\", \"g\":\"h\"}", - ok = control_action(Config, set_global_parameter, - [atom_to_list(Parameter2Key), - GlobalParameter2ValueAsString - ]), - - 2 = length(global_parameters(Config)), - - GlobalParameter2Value = rabbit_ct_broker_helpers:rpc( - Config, 0, - rabbit_runtime_parameters, value_global, - [Parameter2Key] - ), - - [{<<"e">>,<<"f">>}, {<<"g">>,<<"h">>}] = GlobalParameter2Value, - - - GlobalParameter1Value2AsString = "{\"a\":\"z\", \"c\":\"d\"}", - ok = control_action(Config, set_global_parameter, - [atom_to_list(Parameter1Key), - GlobalParameter1Value2AsString - ]), - - 2 = length(global_parameters(Config)), - - GlobalParameter1Value2 = rabbit_ct_broker_helpers:rpc( - Config, 0, - rabbit_runtime_parameters, value_global, - [Parameter1Key] - ), - - [{<<"a">>,<<"z">>}, {<<"c">>,<<"d">>}] = GlobalParameter1Value2, - - ok = control_action(Config, clear_global_parameter, - [atom_to_list(Parameter1Key)] - ), - - 1 = length(global_parameters(Config)), - - not_found = rabbit_ct_broker_helpers:rpc( - Config, 0, - rabbit_runtime_parameters, value_global, - [Parameter1Key] - ), - - ok = control_action(Config, list_global_parameters, []), - - ok. - %%---------------------------------------------------------------------------- %% Helpers %%---------------------------------------------------------------------------- @@ -234,11 +156,7 @@ run_list_queues(Config, Node, Args) -> control_action(Config, Command, Args) -> Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - rabbit_control_main:action( - Command, Node, Args, [], - fun (Format, Args1) -> - io:format(Format ++ " ...~n", Args1) - end). + rabbit_ct_broker_helpers:control_action(Command, Node, Args, []). global_parameters(Config) -> rabbit_ct_broker_helpers:rpc( diff --git a/test/sync_detection_SUITE.erl b/test/sync_detection_SUITE.erl index 1e0a66e8fd..3e5ed8208b 100644 --- a/test/sync_detection_SUITE.erl +++ b/test/sync_detection_SUITE.erl @@ -210,7 +210,7 @@ slave_pids(Node, Queue) -> _ -> Pids end. -%% The mnesia syncronization takes a while, but we don't want to wait for the +%% The mnesia synchronization takes a while, but we don't want to wait for the %% test to fail, since the timetrap is quite high. wait_for_sync_status(Status, Node, Queue) -> Max = 10000 / ?LOOP_RECURSION_DELAY, diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl index a7f2ef4603..af86371fc2 100644 --- a/test/unit_inbroker_SUITE.erl +++ b/test/unit_inbroker_SUITE.erl @@ -22,8 +22,8 @@ -compile(export_all). --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost). +-define(TRANSIENT_MSG_STORE, msg_store_transient_vhost). -define(TIMEOUT_LIST_OPS_PASS, 5000). -define(TIMEOUT, 30000). @@ -70,7 +70,8 @@ all() -> [ {group, parallel_tests}, - {group, non_parallel_tests}, + {group, non_parallel_tests} + , {group, backing_queue_tests}, {group, cluster_tests}, @@ -88,24 +89,12 @@ groups() -> credit_flow_settings, dynamic_mirroring, gen_server2_with_state, - list_operations_timeout_pass, mcall, {password_hashing, [], [ password_hashing, change_password ]}, - {policy_validation, [parallel, {repeat, 20}], [ - ha_policy_validation, - policy_validation, - policy_opts_validation, - queue_master_location_policy_validation, - queue_modes_policy_validation, - vhost_removed_while_updating_policy - ]}, - runtime_parameters, - set_disk_free_limit_command, - topic_matching, - user_management + topic_matching ]}, {non_parallel_tests, [], [ app_management, %% Restart RabbitMQ. @@ -115,9 +104,7 @@ groups() -> head_message_timestamp_statistics, %% Expect specific statistics. log_management, %% Check log files. log_management_during_startup, %% Check log files. - memory_high_watermark, %% Trigger alarm. - externally_rotated_logs_are_automatically_reopened, %% Check log files. - server_status %% Trigger alarm. + externally_rotated_logs_are_automatically_reopened %% Check log files. ]}, {backing_queue_tests, [], [ msg_store, @@ -265,25 +252,41 @@ app_management(Config) -> ?MODULE, app_management1, [Config]). app_management1(_Config) -> - control_action(wait, [os:getenv("RABBITMQ_PID_FILE")]), + wait_for_application(rabbit), %% Starting, stopping and diagnostics. Note that we don't try %% 'report' when the rabbit app is stopped and that we enable %% tracing for the duration of this function. - ok = control_action(trace_on, []), - ok = control_action(stop_app, []), - ok = control_action(stop_app, []), - ok = control_action(status, []), - ok = control_action(cluster_status, []), - ok = control_action(environment, []), - ok = control_action(start_app, []), - ok = control_action(start_app, []), - ok = control_action(status, []), - ok = control_action(report, []), - ok = control_action(cluster_status, []), - ok = control_action(environment, []), - ok = control_action(trace_off, []), + ok = rabbit_trace:start(<<"/">>), + ok = rabbit:stop(), + ok = rabbit:stop(), + ok = no_exceptions(rabbit, status, []), + ok = no_exceptions(rabbit, environment, []), + ok = rabbit:start(), + ok = rabbit:start(), + ok = no_exceptions(rabbit, status, []), + ok = no_exceptions(rabbit, environment, []), + ok = rabbit_trace:stop(<<"/">>), passed. +no_exceptions(Mod, Fun, Args) -> + try erlang:apply(Mod, Fun, Args) of _ -> ok + catch Type:Ex -> {Type, Ex} + end. + +wait_for_application(Application) -> + wait_for_application(Application, 5000). + +wait_for_application(_, Time) when Time =< 0 -> + {error, timeout}; +wait_for_application(Application, Time) -> + Interval = 100, + case lists:keyfind(Application, 1, application:which_applications()) of + false -> + timer:sleep(Interval), + wait_for_application(Application, Time - Interval); + _ -> ok + end. + %% ------------------------------------------------------------------- %% Message store. %% ------------------------------------------------------------------- @@ -347,7 +350,7 @@ msg_store1(_Config) -> %% stop and restart, preserving every other msg in 2nd half ok = rabbit_variable_queue:stop_msg_store(), ok = rabbit_variable_queue:start_msg_store( - [], {fun ([]) -> finished; + #{}, {fun ([]) -> finished; ([MsgId|MsgIdsTail]) when length(MsgIdsTail) rem 2 == 0 -> {MsgId, 1, MsgIdsTail}; @@ -468,10 +471,10 @@ on_disk_stop(Pid) -> msg_store_client_init_capture(MsgStore, Ref) -> Pid = spawn(fun on_disk_capture/0), - {Pid, rabbit_msg_store:client_init( + {Pid, rabbit_msg_store_vhost_sup:client_init( MsgStore, Ref, fun (MsgIds, _ActionTaken) -> Pid ! {on_disk, MsgIds} - end, undefined)}. + end, undefined, <<"/">>)}. msg_store_contains(Atom, MsgIds, MSCState) -> Atom = lists:foldl( @@ -548,14 +551,14 @@ test_msg_store_confirm_timer() -> Ref = rabbit_guid:gen(), MsgId = msg_id_bin(1), Self = self(), - MSCState = rabbit_msg_store:client_init( + MSCState = rabbit_msg_store_vhost_sup:client_init( ?PERSISTENT_MSG_STORE, Ref, fun (MsgIds, _ActionTaken) -> case gb_sets:is_member(MsgId, MsgIds) of true -> Self ! on_disk; false -> ok end - end, undefined), + end, undefined, <<"/">>), ok = msg_store_write([MsgId], MSCState), ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false), ok = msg_store_remove([MsgId], MSCState), @@ -1424,7 +1427,7 @@ nop(_) -> ok. nop(_, _) -> ok. msg_store_client_init(MsgStore, Ref) -> - rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). + rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>). variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( @@ -1842,7 +1845,7 @@ log_management(Config) -> ?MODULE, log_management1, [Config]). log_management1(_Config) -> - [LogFile] = rabbit:log_locations(), + [LogFile|_] = rabbit:log_locations(), Suffix = ".0", ok = test_logs_working([LogFile]), @@ -1852,7 +1855,7 @@ log_management1(_Config) -> ok = test_logs_working([LogFile]), %% simple log rotation - ok = control_action(rotate_logs, []), + ok = rabbit:rotate_logs(), %% FIXME: rabbit:rotate_logs/0 is asynchronous due to a limitation %% in Lager. Therefore, we have no choice but to wait an arbitrary %% amount of time. @@ -1862,53 +1865,53 @@ log_management1(_Config) -> %% log rotation on empty files ok = clean_logs([LogFile], Suffix), - ok = control_action(rotate_logs, []), + ok = rabbit:rotate_logs(), timer:sleep(2000), [{error, enoent}, true] = non_empty_files([LogFile ++ Suffix, LogFile]), %% logs with suffix are not writable - ok = control_action(rotate_logs, []), + ok = rabbit:rotate_logs(), timer:sleep(2000), ok = make_files_non_writable([LogFile ++ Suffix]), - ok = control_action(rotate_logs, []), + ok = rabbit:rotate_logs(), timer:sleep(2000), ok = test_logs_working([LogFile]), %% rotate when original log files are not writable ok = make_files_non_writable([LogFile]), - ok = control_action(rotate_logs, []), + ok = rabbit:rotate_logs(), timer:sleep(2000), %% logging directed to tty (first, remove handlers) - ok = control_action(stop_app, []), + ok = rabbit:stop(), ok = clean_logs([LogFile], Suffix), ok = application:set_env(rabbit, lager_handler, tty), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = control_action(start_app, []), + ok = rabbit:start(), timer:sleep(200), rabbit_log:info("test info"), [{error, enoent}] = empty_files([LogFile]), %% rotate logs when logging is turned off - ok = control_action(stop_app, []), + ok = rabbit:stop(), ok = clean_logs([LogFile], Suffix), ok = application:set_env(rabbit, lager_handler, false), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = control_action(start_app, []), + ok = rabbit:start(), timer:sleep(200), rabbit_log:error("test error"), timer:sleep(200), [{error, enoent}] = empty_files([LogFile]), %% cleanup - ok = control_action(stop_app, []), + ok = rabbit:stop(), ok = clean_logs([LogFile], Suffix), ok = application:set_env(rabbit, lager_handler, LogFile), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = control_action(start_app, []), + ok = rabbit:start(), ok = test_logs_working([LogFile]), passed. @@ -1917,84 +1920,80 @@ log_management_during_startup(Config) -> ?MODULE, log_management_during_startup1, [Config]). log_management_during_startup1(_Config) -> - [LogFile] = rabbit:log_locations(), + [LogFile|_] = rabbit:log_locations(), Suffix = ".0", %% start application with simple tty logging - ok = control_action(stop_app, []), + ok = rabbit:stop(), ok = clean_logs([LogFile], Suffix), ok = application:set_env(rabbit, lager_handler, tty), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = control_action(start_app, []), + ok = rabbit:start(), %% start application with logging to non-existing directory NonExistent = "/tmp/non-existent/test.log", delete_file(NonExistent), delete_file(filename:dirname(NonExistent)), - ok = control_action(stop_app, []), + ok = rabbit:stop(), ok = application:set_env(rabbit, lager_handler, NonExistent), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = control_action(start_app, []), + ok = rabbit:start(), %% start application with logging to directory with no %% write permissions - ok = control_action(stop_app, []), + ok = rabbit:stop(), NoPermission1 = "/var/empty/test.log", delete_file(NoPermission1), delete_file(filename:dirname(NoPermission1)), - ok = control_action(stop_app, []), + ok = rabbit:stop(), ok = application:set_env(rabbit, lager_handler, NoPermission1), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = case control_action(start_app, []) of - ok -> exit({got_success_but_expected_failure, - log_rotation_no_write_permission_dir_test}); - {badrpc, - {'EXIT', {error, {cannot_log_to_file, _, Reason1}}}} - when Reason1 =:= enoent orelse Reason1 =:= eacces -> ok; - {badrpc, - {'EXIT', - {error, {cannot_log_to_file, _, - {cannot_create_parent_dirs, _, Reason1}}}}} - when Reason1 =:= eperm orelse - Reason1 =:= eacces orelse - Reason1 =:= enoent-> ok - end, + ok = try rabbit:start() of + ok -> exit({got_success_but_expected_failure, + log_rotation_no_write_permission_dir_test}) + catch + _:{error, {cannot_log_to_file, _, Reason1}} + when Reason1 =:= enoent orelse Reason1 =:= eacces -> ok; + _:{error, {cannot_log_to_file, _, + {cannot_create_parent_dirs, _, Reason1}}} + when Reason1 =:= eperm orelse + Reason1 =:= eacces orelse + Reason1 =:= enoent-> ok + end, %% start application with logging to a subdirectory which %% parent directory has no write permissions NoPermission2 = "/var/empty/non-existent/test.log", delete_file(NoPermission2), delete_file(filename:dirname(NoPermission2)), - case control_action(stop_app, []) of + case rabbit:stop() of ok -> ok; {error, lager_not_running} -> ok end, ok = application:set_env(rabbit, lager_handler, NoPermission2), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = case control_action(start_app, []) of - ok -> exit({got_success_but_expected_failure, - log_rotatation_parent_dirs_test}); - {badrpc, - {'EXIT', {error, {cannot_log_to_file, _, Reason2}}}} - when Reason2 =:= enoent orelse Reason2 =:= eacces -> ok; - {badrpc, - {'EXIT', - {error, {cannot_log_to_file, _, - {cannot_create_parent_dirs, _, Reason2}}}}} - when Reason2 =:= eperm orelse - Reason2 =:= eacces orelse - Reason2 =:= enoent-> ok - end, + ok = try rabbit:start() of + ok -> exit({got_success_but_expected_failure, + log_rotatation_parent_dirs_test}) + catch + _:{error, {cannot_log_to_file, _, Reason2}} + when Reason2 =:= enoent orelse Reason2 =:= eacces -> ok; + _:{error, {cannot_log_to_file, _, + {cannot_create_parent_dirs, _, Reason2}}} + when Reason2 =:= eperm orelse + Reason2 =:= eacces orelse + Reason2 =:= enoent-> ok + end, %% cleanup ok = application:set_env(rabbit, lager_handler, LogFile), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = control_action(start_app, []), + ok = rabbit:start(), passed. externally_rotated_logs_are_automatically_reopened(Config) -> @@ -2002,7 +2001,7 @@ externally_rotated_logs_are_automatically_reopened(Config) -> ?MODULE, externally_rotated_logs_are_automatically_reopened1, [Config]). externally_rotated_logs_are_automatically_reopened1(_Config) -> - [LogFile] = rabbit:log_locations(), + [LogFile|_] = rabbit:log_locations(), %% Make sure log file is opened ok = test_logs_working([LogFile]), @@ -2185,599 +2184,6 @@ change_password1(_Config) -> %% rabbitmqctl. %% ------------------------------------------------------------------- -list_operations_timeout_pass(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, list_operations_timeout_pass1, [Config]). - -list_operations_timeout_pass1(Config) -> - %% create a few things so there is some useful information to list - {_Writer1, Limiter1, Ch1} = rabbit_ct_broker_helpers:test_channel(), - {_Writer2, Limiter2, Ch2} = rabbit_ct_broker_helpers:test_channel(), - - [Q, Q2] = [Queue || Name <- [<<"list_operations_timeout_pass-q1">>, - <<"list_operations_timeout_pass-q2">>], - {new, Queue = #amqqueue{}} <- - [rabbit_amqqueue:declare( - rabbit_misc:r(<<"/">>, queue, Name), - false, false, [], none)]], - - ok = rabbit_amqqueue:basic_consume( - Q, true, Ch1, Limiter1, false, 0, <<"ctag1">>, true, [], - undefined), - ok = rabbit_amqqueue:basic_consume( - Q2, true, Ch2, Limiter2, false, 0, <<"ctag2">>, true, [], - undefined), - - %% list users - ok = control_action(add_user, - ["list_operations_timeout_pass-user", - "list_operations_timeout_pass-password"]), - {error, {user_already_exists, _}} = - control_action(add_user, - ["list_operations_timeout_pass-user", - "list_operations_timeout_pass-password"]), - ok = control_action_t(list_users, [], ?TIMEOUT_LIST_OPS_PASS), - - %% list parameters - ok = dummy_runtime_parameters:register(), - ok = control_action(set_parameter, ["test", "good", "123"]), - ok = control_action_t(list_parameters, [], ?TIMEOUT_LIST_OPS_PASS), - ok = control_action(clear_parameter, ["test", "good"]), - dummy_runtime_parameters:unregister(), - - %% list vhosts - ok = control_action(add_vhost, ["/list_operations_timeout_pass-vhost"]), - {error, {vhost_already_exists, _}} = - control_action(add_vhost, ["/list_operations_timeout_pass-vhost"]), - ok = control_action_t(list_vhosts, [], ?TIMEOUT_LIST_OPS_PASS), - - %% list permissions - ok = control_action(set_permissions, - ["list_operations_timeout_pass-user", ".*", ".*", ".*"], - [{"-p", "/list_operations_timeout_pass-vhost"}]), - ok = control_action_t(list_permissions, [], - [{"-p", "/list_operations_timeout_pass-vhost"}], - ?TIMEOUT_LIST_OPS_PASS), - - %% list user permissions - ok = control_action_t(list_user_permissions, - ["list_operations_timeout_pass-user"], - ?TIMEOUT_LIST_OPS_PASS), - - %% list policies - ok = control_action_opts( - ["set_policy", "list_operations_timeout_pass-policy", ".*", - "{\"ha-mode\":\"all\"}"]), - ok = control_action_t(list_policies, [], ?TIMEOUT_LIST_OPS_PASS), - ok = control_action(clear_policy, ["list_operations_timeout_pass-policy"]), - - %% list queues - ok = info_action_t(list_queues, - rabbit_amqqueue:info_keys(), false, - ?TIMEOUT_LIST_OPS_PASS), - - %% list exchanges - ok = info_action_t(list_exchanges, - rabbit_exchange:info_keys(), true, - ?TIMEOUT_LIST_OPS_PASS), - - %% list bindings - ok = info_action_t(list_bindings, - rabbit_binding:info_keys(), true, - ?TIMEOUT_LIST_OPS_PASS), - - %% list connections - H = ?config(rmq_hostname, Config), - P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - {ok, C1} = gen_tcp:connect(H, P, [binary, {active, false}]), - gen_tcp:send(C1, <<"AMQP", 0, 0, 9, 1>>), - {ok, <<1,0,0>>} = gen_tcp:recv(C1, 3, 100), - - {ok, C2} = gen_tcp:connect(H, P, [binary, {active, false}]), - gen_tcp:send(C2, <<"AMQP", 0, 0, 9, 1>>), - {ok, <<1,0,0>>} = gen_tcp:recv(C2, 3, 100), - - ok = info_action_t( - list_connections, rabbit_networking:connection_info_keys(), false, - ?TIMEOUT_LIST_OPS_PASS), - - %% list consumers - ok = info_action_t( - list_consumers, rabbit_amqqueue:consumer_info_keys(), false, - ?TIMEOUT_LIST_OPS_PASS), - - %% list channels - ok = info_action_t( - list_channels, rabbit_channel:info_keys(), false, - ?TIMEOUT_LIST_OPS_PASS), - - %% do some cleaning up - ok = control_action(delete_user, ["list_operations_timeout_pass-user"]), - {error, {no_such_user, _}} = - control_action(delete_user, ["list_operations_timeout_pass-user"]), - - ok = control_action(delete_vhost, ["/list_operations_timeout_pass-vhost"]), - {error, {no_such_vhost, _}} = - control_action(delete_vhost, ["/list_operations_timeout_pass-vhost"]), - - %% close_connection - Conns = rabbit_ct_broker_helpers:get_connection_pids([C1, C2]), - [ok, ok] = [ok = control_action( - close_connection, [rabbit_misc:pid_to_string(ConnPid), "go away"]) - || ConnPid <- Conns], - - %% cleanup queues - [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], - - [begin - unlink(Chan), - ok = rabbit_channel:shutdown(Chan) - end || Chan <- [Ch1, Ch2]], - passed. - -user_management(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, user_management1, [Config]). - -user_management1(_Config) -> - - %% lots if stuff that should fail - {error, {no_such_user, _}} = - control_action(delete_user, - ["user_management-user"]), - {error, {no_such_user, _}} = - control_action(change_password, - ["user_management-user", "user_management-password"]), - {error, {no_such_vhost, _}} = - control_action(delete_vhost, - ["/user_management-vhost"]), - {error, {no_such_user, _}} = - control_action(set_permissions, - ["user_management-user", ".*", ".*", ".*"]), - {error, {no_such_user, _}} = - control_action(clear_permissions, - ["user_management-user"]), - {error, {no_such_user, _}} = - control_action(list_user_permissions, - ["user_management-user"]), - {error, {no_such_vhost, _}} = - control_action(list_permissions, [], - [{"-p", "/user_management-vhost"}]), - {error, {invalid_regexp, _, _}} = - control_action(set_permissions, - ["guest", "+foo", ".*", ".*"]), - {error, {no_such_user, _}} = - control_action(set_user_tags, - ["user_management-user", "bar"]), - - %% user creation - ok = control_action(add_user, - ["user_management-user", "user_management-password"]), - {error, {user_already_exists, _}} = - control_action(add_user, - ["user_management-user", "user_management-password"]), - ok = control_action(clear_password, - ["user_management-user"]), - ok = control_action(change_password, - ["user_management-user", "user_management-newpassword"]), - - TestTags = fun (Tags) -> - Args = ["user_management-user" | [atom_to_list(T) || T <- Tags]], - ok = control_action(set_user_tags, Args), - {ok, #internal_user{tags = Tags}} = - rabbit_auth_backend_internal:lookup_user( - <<"user_management-user">>), - ok = control_action(list_users, []) - end, - TestTags([foo, bar, baz]), - TestTags([administrator]), - TestTags([]), - - %% user authentication - ok = control_action(authenticate_user, - ["user_management-user", "user_management-newpassword"]), - {refused, _User, _Format, _Params} = - control_action(authenticate_user, - ["user_management-user", "user_management-password"]), - - %% vhost creation - ok = control_action(add_vhost, - ["/user_management-vhost"]), - {error, {vhost_already_exists, _}} = - control_action(add_vhost, - ["/user_management-vhost"]), - ok = control_action(list_vhosts, []), - - %% user/vhost mapping - ok = control_action(set_permissions, - ["user_management-user", ".*", ".*", ".*"], - [{"-p", "/user_management-vhost"}]), - ok = control_action(set_permissions, - ["user_management-user", ".*", ".*", ".*"], - [{"-p", "/user_management-vhost"}]), - ok = control_action(set_permissions, - ["user_management-user", ".*", ".*", ".*"], - [{"-p", "/user_management-vhost"}]), - ok = control_action(list_permissions, [], - [{"-p", "/user_management-vhost"}]), - ok = control_action(list_permissions, [], - [{"-p", "/user_management-vhost"}]), - ok = control_action(list_user_permissions, - ["user_management-user"]), - - %% user/vhost unmapping - ok = control_action(clear_permissions, - ["user_management-user"], [{"-p", "/user_management-vhost"}]), - ok = control_action(clear_permissions, - ["user_management-user"], [{"-p", "/user_management-vhost"}]), - - %% vhost deletion - ok = control_action(delete_vhost, - ["/user_management-vhost"]), - {error, {no_such_vhost, _}} = - control_action(delete_vhost, - ["/user_management-vhost"]), - - %% deleting a populated vhost - ok = control_action(add_vhost, - ["/user_management-vhost"]), - ok = control_action(set_permissions, - ["user_management-user", ".*", ".*", ".*"], - [{"-p", "/user_management-vhost"}]), - {new, _} = rabbit_amqqueue:declare( - rabbit_misc:r(<<"/user_management-vhost">>, queue, - <<"user_management-vhost-queue">>), - true, false, [], none), - ok = control_action(delete_vhost, - ["/user_management-vhost"]), - - %% user deletion - ok = control_action(delete_user, - ["user_management-user"]), - {error, {no_such_user, _}} = - control_action(delete_user, - ["user_management-user"]), - - passed. - -runtime_parameters(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, runtime_parameters1, [Config]). - -runtime_parameters1(_Config) -> - dummy_runtime_parameters:register(), - Good = fun(L) -> ok = control_action(set_parameter, L) end, - Bad = fun(L) -> {error_string, _} = control_action(set_parameter, L) end, - - %% Acceptable for bijection - Good(["test", "good", "\"ignore\""]), - Good(["test", "good", "123"]), - Good(["test", "good", "true"]), - Good(["test", "good", "false"]), - Good(["test", "good", "null"]), - Good(["test", "good", "{\"key\": \"value\"}"]), - - %% Invalid json - Bad(["test", "good", "atom"]), - Bad(["test", "good", "{\"foo\": \"bar\""]), - Bad(["test", "good", "{foo: \"bar\"}"]), - - %% Test actual validation hook - Good(["test", "maybe", "\"good\""]), - Bad(["test", "maybe", "\"bad\""]), - Good(["test", "admin", "\"ignore\""]), %% ctl means 'user' -> none - - ok = control_action(list_parameters, []), - - ok = control_action(clear_parameter, ["test", "good"]), - ok = control_action(clear_parameter, ["test", "maybe"]), - ok = control_action(clear_parameter, ["test", "admin"]), - {error_string, _} = - control_action(clear_parameter, ["test", "neverexisted"]), - - %% We can delete for a component that no longer exists - Good(["test", "good", "\"ignore\""]), - dummy_runtime_parameters:unregister(), - ok = control_action(clear_parameter, ["test", "good"]), - passed. - -policy_validation(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, policy_validation1, [Config]). - -policy_validation1(_Config) -> - PolicyName = "runtime_parameters-policy", - dummy_runtime_parameters:register_policy_validator(), - SetPol = fun (Key, Val) -> - control_action_opts( - ["set_policy", PolicyName, ".*", - rabbit_misc:format("{\"~s\":~p}", [Key, Val])]) - end, - OK = fun (Key, Val) -> - ok = SetPol(Key, Val), - true = does_policy_exist(PolicyName, - [{definition, [{list_to_binary(Key), Val}]}]) - end, - - OK("testeven", []), - OK("testeven", [1, 2]), - OK("testeven", [1, 2, 3, 4]), - OK("testpos", [2, 5, 5678]), - - {error_string, _} = SetPol("testpos", [-1, 0, 1]), - {error_string, _} = SetPol("testeven", [ 1, 2, 3]), - - ok = control_action(clear_policy, [PolicyName]), - dummy_runtime_parameters:unregister_policy_validator(), - passed. - -policy_opts_validation(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, policy_opts_validation1, [Config]). - -policy_opts_validation1(_Config) -> - PolicyName = "policy_opts_validation-policy", - Set = fun (Extra) -> control_action_opts( - ["set_policy", PolicyName, - ".*", "{\"ha-mode\":\"all\"}" - | Extra]) end, - OK = fun (Extra, Props) -> - ok = Set(Extra), - true = does_policy_exist(PolicyName, Props) - end, - Fail = fun (Extra) -> - case Set(Extra) of - {error_string, _} -> ok; - no_command when Extra =:= ["--priority"] -> ok; - no_command when Extra =:= ["--apply-to"] -> ok; - {'EXIT', - {function_clause, - [{rabbit_control_main,action, _, _} | _]}} - when Extra =:= ["--offline"] -> ok - end - end, - - OK ([], [{priority, 0}, {'apply-to', <<"all">>}]), - - OK (["--priority", "0"], [{priority, 0}]), - OK (["--priority", "3"], [{priority, 3}]), - Fail(["--priority", "banana"]), - Fail(["--priority"]), - - OK (["--apply-to", "all"], [{'apply-to', <<"all">>}]), - OK (["--apply-to", "queues"], [{'apply-to', <<"queues">>}]), - Fail(["--apply-to", "bananas"]), - Fail(["--apply-to"]), - - OK (["--priority", "3", "--apply-to", "queues"], [{priority, 3}, {'apply-to', <<"queues">>}]), - Fail(["--priority", "banana", "--apply-to", "queues"]), - Fail(["--priority", "3", "--apply-to", "bananas"]), - - Fail(["--offline"]), - - ok = control_action(clear_policy, [PolicyName]), - passed. - -ha_policy_validation(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, ha_policy_validation1, [Config]). - -ha_policy_validation1(_Config) -> - PolicyName = "ha_policy_validation-policy", - Set = fun (JSON) -> control_action_opts( - ["set_policy", PolicyName, - ".*", JSON]) end, - OK = fun (JSON, Def) -> - ok = Set(JSON), - true = does_policy_exist(PolicyName, [{definition, Def}]) - end, - Fail = fun (JSON) -> {error_string, _} = Set(JSON) end, - - OK ("{\"ha-mode\":\"all\"}", [{<<"ha-mode">>, <<"all">>}]), - Fail("{\"ha-mode\":\"made_up\"}"), - - Fail("{\"ha-mode\":\"nodes\"}"), - Fail("{\"ha-mode\":\"nodes\",\"ha-params\":2}"), - Fail("{\"ha-mode\":\"nodes\",\"ha-params\":[\"a\",2]}"), - OK ("{\"ha-mode\":\"nodes\",\"ha-params\":[\"a\",\"b\"]}", - [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [<<"a">>, <<"b">>]}]), - Fail("{\"ha-params\":[\"a\",\"b\"]}"), - - Fail("{\"ha-mode\":\"exactly\"}"), - Fail("{\"ha-mode\":\"exactly\",\"ha-params\":[\"a\",\"b\"]}"), - OK ("{\"ha-mode\":\"exactly\",\"ha-params\":2}", - [{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}]), - Fail("{\"ha-params\":2}"), - - OK ("{\"ha-mode\":\"all\",\"ha-sync-mode\":\"manual\"}", - [{<<"ha-mode">>, <<"all">>}, {<<"ha-sync-mode">>, <<"manual">>}]), - OK ("{\"ha-mode\":\"all\",\"ha-sync-mode\":\"automatic\"}", - [{<<"ha-mode">>, <<"all">>}, {<<"ha-sync-mode">>, <<"automatic">>}]), - Fail("{\"ha-mode\":\"all\",\"ha-sync-mode\":\"made_up\"}"), - Fail("{\"ha-sync-mode\":\"manual\"}"), - Fail("{\"ha-sync-mode\":\"automatic\"}"), - - ok = control_action(clear_policy, [PolicyName]), - passed. - -queue_master_location_policy_validation(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, queue_master_location_policy_validation1, [Config]). - -queue_master_location_policy_validation1(_Config) -> - PolicyName = "queue_master_location_policy_validation-policy", - Set = fun (JSON) -> - control_action_opts( - ["set_policy", PolicyName, ".*", JSON]) - end, - OK = fun (JSON, Def) -> - ok = Set(JSON), - true = does_policy_exist(PolicyName, [{definition, Def}]) - end, - Fail = fun (JSON) -> {error_string, _} = Set(JSON) end, - - OK ("{\"queue-master-locator\":\"min-masters\"}", - [{<<"queue-master-locator">>, <<"min-masters">>}]), - OK ("{\"queue-master-locator\":\"client-local\"}", - [{<<"queue-master-locator">>, <<"client-local">>}]), - OK ("{\"queue-master-locator\":\"random\"}", - [{<<"queue-master-locator">>, <<"random">>}]), - Fail("{\"queue-master-locator\":\"made_up\"}"), - - ok = control_action(clear_policy, [PolicyName]), - passed. - -queue_modes_policy_validation(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, queue_modes_policy_validation1, [Config]). - -queue_modes_policy_validation1(_Config) -> - PolicyName = "queue_modes_policy_validation-policy", - Set = fun (JSON) -> - control_action_opts( - ["set_policy", PolicyName, ".*", JSON]) - end, - OK = fun (JSON, Def) -> - ok = Set(JSON), - true = does_policy_exist(PolicyName, [{definition, Def}]) - end, - Fail = fun (JSON) -> {error_string, _} = Set(JSON) end, - - OK ("{\"queue-mode\":\"lazy\"}", - [{<<"queue-mode">>, <<"lazy">>}]), - OK ("{\"queue-mode\":\"default\"}", - [{<<"queue-mode">>, <<"default">>}]), - Fail("{\"queue-mode\":\"wrong\"}"), - - ok = control_action(clear_policy, [PolicyName]), - passed. - -vhost_removed_while_updating_policy(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, vhost_removed_while_updating_policy1, [Config]). - -vhost_removed_while_updating_policy1(_Config) -> - VHost = "/vhost_removed_while_updating_policy-vhost", - PolicyName = "vhost_removed_while_updating_policy-policy", - - ok = control_action(add_vhost, [VHost]), - ok = control_action_opts( - ["set_policy", "-p", VHost, PolicyName, ".*", "{\"ha-mode\":\"all\"}"]), - true = does_policy_exist(PolicyName, []), - - %% Removing the vhost triggers the deletion of the policy. Once - %% the policy and the vhost are actually removed, RabbitMQ calls - %% update_policies() which lists policies on the given vhost. This - %% obviously fails because the vhost is gone, but the call should - %% still succeed. - ok = control_action(delete_vhost, [VHost]), - false = does_policy_exist(PolicyName, []), - - passed. - -does_policy_exist(PolicyName, Props) -> - PolicyNameBin = list_to_binary(PolicyName), - Policies = lists:filter( - fun(Policy) -> - lists:member({name, PolicyNameBin}, Policy) - end, rabbit_policy:list()), - case Policies of - [Policy] -> check_policy_props(Policy, Props); - [] -> false; - _ -> false - end. - -check_policy_props(Policy, [Prop | Rest]) -> - case lists:member(Prop, Policy) of - true -> check_policy_props(Policy, Rest); - false -> false - end; -check_policy_props(_Policy, []) -> - true. - -server_status(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, server_status1, [Config]). - -server_status1(Config) -> - %% create a few things so there is some useful information to list - {_Writer, Limiter, Ch} = rabbit_ct_broker_helpers:test_channel(), - [Q, Q2] = [Queue || {Name, Owner} <- [{<<"server_status-q1">>, none}, - {<<"server_status-q2">>, self()}], - {new, Queue = #amqqueue{}} <- - [rabbit_amqqueue:declare( - rabbit_misc:r(<<"/">>, queue, Name), - false, false, [], Owner)]], - ok = rabbit_amqqueue:basic_consume( - Q, true, Ch, Limiter, false, 0, <<"ctag">>, true, [], undefined), - - %% list queues - ok = info_action(list_queues, - rabbit_amqqueue:info_keys(), true), - - %% as we have no way to collect output of - %% info_action/3 call, the only way we - %% can test individual queueinfoitems is by directly calling - %% rabbit_amqqueue:info/2 - [{exclusive, false}] = rabbit_amqqueue:info(Q, [exclusive]), - [{exclusive, true}] = rabbit_amqqueue:info(Q2, [exclusive]), - - %% list exchanges - ok = info_action(list_exchanges, - rabbit_exchange:info_keys(), true), - - %% list bindings - ok = info_action(list_bindings, - rabbit_binding:info_keys(), true), - %% misc binding listing APIs - [_|_] = rabbit_binding:list_for_source( - rabbit_misc:r(<<"/">>, exchange, <<"">>)), - [_] = rabbit_binding:list_for_destination( - rabbit_misc:r(<<"/">>, queue, <<"server_status-q1">>)), - [_] = rabbit_binding:list_for_source_and_destination( - rabbit_misc:r(<<"/">>, exchange, <<"">>), - rabbit_misc:r(<<"/">>, queue, <<"server_status-q1">>)), - - %% list connections - H = ?config(rmq_hostname, Config), - P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - {ok, C} = gen_tcp:connect(H, P, []), - gen_tcp:send(C, <<"AMQP", 0, 0, 9, 1>>), - timer:sleep(100), - ok = info_action(list_connections, - rabbit_networking:connection_info_keys(), false), - %% close_connection - [ConnPid] = rabbit_ct_broker_helpers:get_connection_pids([C]), - ok = control_action(close_connection, - [rabbit_misc:pid_to_string(ConnPid), "go away"]), - - %% list channels - ok = info_action(list_channels, rabbit_channel:info_keys(), false), - - %% list consumers - ok = control_action(list_consumers, []), - - %% set vm memory high watermark - HWM = vm_memory_monitor:get_vm_memory_high_watermark(), - ok = control_action(set_vm_memory_high_watermark, ["1"]), - ok = control_action(set_vm_memory_high_watermark, ["1.0"]), - %% this will trigger an alarm - ok = control_action(set_vm_memory_high_watermark, ["0.0"]), - %% reset - ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]), - - %% eval - {error_string, _} = control_action(eval, ["\""]), - {error_string, _} = control_action(eval, ["a("]), - ok = control_action(eval, ["a."]), - - %% cleanup - [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], - - unlink(Ch), - ok = rabbit_channel:shutdown(Ch), - - passed. amqp_connection_refusal(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, @@ -3395,39 +2801,6 @@ configurable_server_properties1(_Config) -> application:set_env(rabbit, server_properties, ServerProperties), passed. -memory_high_watermark(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, memory_high_watermark1, [Config]). - -memory_high_watermark1(_Config) -> - %% set vm memory high watermark - HWM = vm_memory_monitor:get_vm_memory_high_watermark(), - %% this will trigger an alarm - ok = control_action(set_vm_memory_high_watermark, - ["absolute", "2000"]), - [{{resource_limit,memory,_},[]}] = rabbit_alarm:get_alarms(), - %% reset - ok = control_action(set_vm_memory_high_watermark, - [float_to_list(HWM)]), - - passed. - -set_disk_free_limit_command(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, set_disk_free_limit_command1, [Config]). - -set_disk_free_limit_command1(_Config) -> - ok = control_action(set_disk_free_limit, - ["2000kiB"]), - 2048000 = rabbit_disk_monitor:get_disk_free_limit(), - ok = control_action(set_disk_free_limit, - ["mem_relative", "1.1"]), - ExpectedLimit = 1.1 * vm_memory_monitor:get_total_memory(), - % Total memory is unstable, so checking order - true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() < 1.2, - true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() > 0.98, - ok = control_action(set_disk_free_limit, ["50MB"]), - passed. disk_monitor(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, @@ -3715,83 +3088,6 @@ expect_event(Tag, Key, Type) -> %% rabbitmqctl helpers. %% --------------------------------------------------------------------------- -control_action(Command, Args) -> - control_action(Command, node(), Args, default_options()). - -control_action(Command, Args, NewOpts) -> - control_action(Command, node(), Args, - expand_options(default_options(), NewOpts)). - -control_action(Command, Node, Args, Opts) -> - case catch rabbit_control_main:action( - Command, Node, Args, Opts, - fun (Format, Args1) -> - io:format(Format ++ " ...~n", Args1) - end) of - ok -> - io:format("done.~n"), - ok; - {ok, Result} -> - rabbit_control_misc:print_cmd_result(Command, Result), - ok; - Other -> - io:format("failed: ~p~n", [Other]), - Other - end. - -control_action_t(Command, Args, Timeout) when is_number(Timeout) -> - control_action_t(Command, node(), Args, default_options(), Timeout). - -control_action_t(Command, Args, NewOpts, Timeout) when is_number(Timeout) -> - control_action_t(Command, node(), Args, - expand_options(default_options(), NewOpts), - Timeout). - -control_action_t(Command, Node, Args, Opts, Timeout) when is_number(Timeout) -> - case catch rabbit_control_main:action( - Command, Node, Args, Opts, - fun (Format, Args1) -> - io:format(Format ++ " ...~n", Args1) - end, Timeout) of - ok -> - io:format("done.~n"), - ok; - {ok, Result} -> - rabbit_control_misc:print_cmd_result(Command, Result), - ok; - Other -> - io:format("failed: ~p~n", [Other]), - Other - end. - -control_action_opts(Raw) -> - NodeStr = atom_to_list(node()), - case rabbit_control_main:parse_arguments(Raw, NodeStr) of - {ok, {Cmd, Opts, Args}} -> - case control_action(Cmd, node(), Args, Opts) of - ok -> ok; - Error -> Error - end; - Error -> - Error - end. - -info_action(Command, Args, CheckVHost) -> - ok = control_action(Command, []), - if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]); - true -> ok - end, - ok = control_action(Command, lists:map(fun atom_to_list/1, Args)), - {bad_argument, dummy} = control_action(Command, ["dummy"]), - ok. - -info_action_t(Command, Args, CheckVHost, Timeout) when is_number(Timeout) -> - if CheckVHost -> ok = control_action_t(Command, [], ["-p", "/"], Timeout); - true -> ok - end, - ok = control_action_t(Command, lists:map(fun atom_to_list/1, Args), Timeout), - ok. - default_options() -> [{"-p", "/"}, {"-q", "false"}]. expand_options(As, Bs) -> |
