summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/rabbitmqctl_integration_SUITE.erl113
-rw-r--r--test/worker_pool_SUITE.erl193
2 files changed, 300 insertions, 6 deletions
diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl
index 9305781bda..ef85472f48 100644
--- a/test/rabbitmqctl_integration_SUITE.erl
+++ b/test/rabbitmqctl_integration_SUITE.erl
@@ -31,17 +31,24 @@
-export([list_queues_local/1
,list_queues_offline/1
,list_queues_online/1
+ ,manage_global_parameters/1
]).
all() ->
- [{group, list_queues}].
+ [
+ {group, list_queues},
+ {group, global_parameters}
+ ].
groups() ->
- [{list_queues, [],
- [list_queues_local
- ,list_queues_online
- ,list_queues_offline
- ]}].
+ [
+ {list_queues, [],
+ [list_queues_local
+ ,list_queues_online
+ ,list_queues_offline
+ ]},
+ {global_parameters, [], [manage_global_parameters]}
+ ].
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
@@ -56,6 +63,13 @@ init_per_group(list_queues, Config0) ->
Config1 = declare_some_queues(Config),
rabbit_ct_broker_helpers:stop_node(Config1, NumNodes - 1),
Config1;
+init_per_group(global_parameters,Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
init_per_group(_, Config) ->
Config.
@@ -92,6 +106,10 @@ end_per_group(list_queues, Config0) ->
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps());
+end_per_group(global_parameters, Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
end_per_group(_, Config) ->
Config.
@@ -126,6 +144,75 @@ list_queues_offline(Config) ->
assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues),
ok.
+manage_global_parameters(Config) ->
+ 0 = length(global_parameters(Config)),
+ Parameter1Key = global_param1,
+ GlobalParameter1ValueAsString = "{\"a\":\"b\", \"c\":\"d\"}",
+ ok = control_action(Config, set_global_parameter,
+ [atom_to_list(Parameter1Key),
+ GlobalParameter1ValueAsString
+ ]),
+
+ 1 = length(global_parameters(Config)),
+
+ GlobalParameter1Value = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_runtime_parameters, value_global,
+ [Parameter1Key]
+ ),
+
+ [{<<"a">>,<<"b">>}, {<<"c">>,<<"d">>}] = GlobalParameter1Value,
+
+ Parameter2Key = global_param2,
+ GlobalParameter2ValueAsString = "{\"e\":\"f\", \"g\":\"h\"}",
+ ok = control_action(Config, set_global_parameter,
+ [atom_to_list(Parameter2Key),
+ GlobalParameter2ValueAsString
+ ]),
+
+ 2 = length(global_parameters(Config)),
+
+ GlobalParameter2Value = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_runtime_parameters, value_global,
+ [Parameter2Key]
+ ),
+
+ [{<<"e">>,<<"f">>}, {<<"g">>,<<"h">>}] = GlobalParameter2Value,
+
+
+ GlobalParameter1Value2AsString = "{\"a\":\"z\", \"c\":\"d\"}",
+ ok = control_action(Config, set_global_parameter,
+ [atom_to_list(Parameter1Key),
+ GlobalParameter1Value2AsString
+ ]),
+
+ 2 = length(global_parameters(Config)),
+
+ GlobalParameter1Value2 = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_runtime_parameters, value_global,
+ [Parameter1Key]
+ ),
+
+ [{<<"a">>,<<"z">>}, {<<"c">>,<<"d">>}] = GlobalParameter1Value2,
+
+ ok = control_action(Config, clear_global_parameter,
+ [atom_to_list(Parameter1Key)]
+ ),
+
+ 1 = length(global_parameters(Config)),
+
+ not_found = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_runtime_parameters, value_global,
+ [Parameter1Key]
+ ),
+
+ ok = control_action(Config, list_global_parameters, []),
+
+ ok.
+
%%----------------------------------------------------------------------------
%% Helpers
%%----------------------------------------------------------------------------
@@ -144,3 +231,17 @@ assert_ctl_queues(Config, Node, Args, Expected0) ->
run_list_queues(Config, Node, Args) ->
rabbit_ct_broker_helpers:rabbitmqctl_list(Config, Node, ["list_queues"] ++ Args ++ ["name"]).
+
+control_action(Config, Command, Args) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ rabbit_control_main:action(
+ Command, Node, Args, [],
+ fun (Format, Args1) ->
+ io:format(Format ++ " ...~n", Args1)
+ end).
+
+global_parameters(Config) ->
+ rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_runtime_parameters, list_global, []
+ ). \ No newline at end of file
diff --git a/test/worker_pool_SUITE.erl b/test/worker_pool_SUITE.erl
new file mode 100644
index 0000000000..7eb4d6fd04
--- /dev/null
+++ b/test/worker_pool_SUITE.erl
@@ -0,0 +1,193 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(worker_pool_SUITE).
+
+-compile(export_all).
+-include_lib("common_test/include/ct.hrl").
+
+
+-define(POOL_SIZE, 1).
+-define(POOL_NAME, test_pool).
+
+all() ->
+ [
+ run_code_synchronously,
+ run_code_asynchronously,
+ set_timeout,
+ cancel_timeout,
+ cancel_timeout_by_setting
+ ].
+
+init_per_testcase(_, Config) ->
+ {ok, Pool} = worker_pool_sup:start_link(?POOL_SIZE, ?POOL_NAME),
+ rabbit_ct_helpers:set_config(Config, [{pool_sup, Pool}]).
+
+end_per_testcase(_, Config) ->
+ Pool = ?config(pool_sup, Config),
+ unlink(Pool),
+ exit(Pool, kill).
+
+run_code_synchronously(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Sleep = 200,
+ {Time, Result} = timer:tc(fun() ->
+ worker_pool:submit(?POOL_NAME,
+ fun() ->
+ timer:sleep(Sleep),
+ Self ! {hi, Test},
+ self()
+ end,
+ reuse)
+ end),
+ % Worker run synchronously
+ true = Time > Sleep,
+ % Worker have sent message
+ receive {hi, Test} -> ok
+ after 0 -> error(no_message_from_worker)
+ end,
+ % Worker is a separate process
+ true = (Self /= Result).
+
+run_code_asynchronously(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Sleep = 200,
+ {Time, Result} = timer:tc(fun() ->
+ worker_pool:submit_async(?POOL_NAME,
+ fun() ->
+ timer:sleep(Sleep),
+ Self ! {hi, Test},
+ self()
+ end)
+ end),
+ % Worker run synchronously
+ true = Time < Sleep,
+ % Worker have sent message
+ receive {hi, Test} -> ok
+ after Sleep + 100 -> error(no_message_from_worker)
+ end,
+ % Worker is a separate process
+ true = (Self /= Result).
+
+set_timeout(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Worker = worker_pool:submit(?POOL_NAME,
+ fun() ->
+ Worker = self(),
+ timer:sleep(100),
+ worker_pool_worker:set_timeout(
+ my_timeout, 1000,
+ fun() ->
+ Self ! {hello, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ % Timeout will occur after 1000 ms only
+ receive {hello, Worker, Test} -> exit(timeout_should_wait)
+ after 0 -> ok
+ end,
+
+ timer:sleep(1000),
+
+ receive {hello, Worker, Test} -> ok
+ after 1000 -> exit(timeout_is_late)
+ end.
+
+
+cancel_timeout(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Worker = worker_pool:submit(?POOL_NAME,
+ fun() ->
+ Worker = self(),
+ timer:sleep(100),
+ worker_pool_worker:set_timeout(
+ my_timeout, 1000,
+ fun() ->
+ Self ! {hello, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ % Timeout will occur after 1000 ms only
+ receive {hello, Worker, Test} -> exit(timeout_should_wait)
+ after 0 -> ok
+ end,
+
+ worker_pool_worker:next_job_from(Worker, Self),
+ Worker = worker_pool_worker:submit(Worker,
+ fun() ->
+ worker_pool_worker:clear_timeout(my_timeout),
+ Worker
+ end,
+ reuse),
+
+ timer:sleep(1000),
+ receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld)
+ after 0 -> ok
+ end.
+
+cancel_timeout_by_setting(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Worker = worker_pool:submit(?POOL_NAME,
+ fun() ->
+ Worker = self(),
+ timer:sleep(100),
+ worker_pool_worker:set_timeout(
+ my_timeout, 1000,
+ fun() ->
+ Self ! {hello, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ % Timeout will occur after 1000 ms only
+ receive {hello, Worker, Test} -> exit(timeout_should_wait)
+ after 0 -> ok
+ end,
+
+ worker_pool_worker:next_job_from(Worker, Self),
+ Worker = worker_pool_worker:submit(Worker,
+ fun() ->
+ worker_pool_worker:set_timeout(my_timeout, 1000,
+ fun() ->
+ Self ! {hello_reset, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ timer:sleep(1000),
+ receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld)
+ after 0 -> ok
+ end,
+
+ receive {hello_reset, Worker, Test} -> ok
+ after 1000 -> exit(timeout_is_late)
+ end.
+
+
+
+
+