summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmqctl.1.xml81
-rwxr-xr-xscripts/rabbitmq-server6
-rw-r--r--scripts/rabbitmq-server.bat4
-rw-r--r--scripts/rabbitmq-service.bat6
-rw-r--r--src/rabbit_control_main.erl28
-rw-r--r--src/rabbit_runtime_parameters.erl79
-rw-r--r--src/worker_pool_worker.erl47
-rw-r--r--test/rabbitmqctl_integration_SUITE.erl113
-rw-r--r--test/worker_pool_SUITE.erl193
9 files changed, 537 insertions, 20 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index fc4927ee1c..32dc43845a 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1030,11 +1030,16 @@
<para>
Certain features of RabbitMQ (such as the federation plugin)
are controlled by dynamic,
- cluster-wide <emphasis>parameters</emphasis>. Each parameter
- consists of a component name, a name and a value, and is
- associated with a virtual host. The component name and name are
- strings, and the value is an Erlang term. Parameters can be
- set, cleared and listed. In general you should refer to the
+ cluster-wide <emphasis>parameters</emphasis>.
+ There are 2 kinds of parameters: parameters scoped to
+ a virtual host and global parameters.
+ Each vhost-scoped parameter
+ consists of a component name, a name and a value.
+ The component name and name are
+ strings, and the value is an Erlang term.
+ A global parameter consists of a name and value. The name
+ is a string and the value is an Erlang term.
+ Parameters can be set, cleared and listed. In general you should refer to the
documentation for the feature in question to see how to set
parameters.
</para>
@@ -1116,6 +1121,72 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>set_global_parameter</command> <arg choice="req"><replaceable>name</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Sets a global runtime parameter. This is similar to <command>set_parameter</command>
+ but the key-value pair isn't tied to a virtual host.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>name</term>
+ <listitem><para>
+ The name of the global runtime parameter being set.
+ </para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>value</term>
+ <listitem><para>
+ The value for the global runtime parameter, as a
+ JSON term. In most shells you are very likely to
+ need to quote this.
+ </para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl set_global_parameter mqtt_default_vhosts '{"O=client,CN=guest":"/"}'</screen>
+ <para role="example">
+ This command sets the global runtime parameter <command>mqtt_default_vhosts</command> to the JSON term <command>{"O=client,CN=guest":"/"}</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>clear_global_parameter</command> <arg choice="req"><replaceable>name</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Clears a global runtime parameter. This is similar to <command>clear_global_parameter</command>
+ but the key-value pair isn't tied to a virtual host.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>name</term>
+ <listitem><para>
+ The name of the global runtime parameter being cleared.
+ </para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl clear_global_parameter mqtt_default_vhosts</screen>
+ <para role="example">
+ This command clears the global runtime parameter <command>mqtt_default_vhosts</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>list_global_parameters</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Lists all global runtime parameters. This is similar to <command>list_parameters</command>
+ but the global runtime parameters are not tied to any virtual host.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl list_global_parameters</screen>
+ <para role="example">
+ This command lists all global parameters.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 33369616ef..cd56b67d73 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -163,6 +163,11 @@ else
RABBITMQ_LAGER_HANDLER_UPGRADE='"'${RABBITMQ_UPGRADE_LOG}'"'
fi
+# Bump ETS table limit to 50000
+if ["x" = "x$ERL_MAX_ETS_TABLES"]; then
+ ERL_MAX_ETS_TABLES=50000
+fi
+
# we need to turn off path expansion because some of the vars, notably
# RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and
# there is no other way of preventing their expansion.
@@ -191,6 +196,7 @@ start_rabbitmq_server() {
ensure_thread_pool_size
check_start_params &&
RABBITMQ_CONFIG_FILE=$RABBITMQ_CONFIG_FILE \
+ ERL_MAX_ETS_TABLES=$ERL_MAX_ETS_TABLES \
exec ${ERL_DIR}erl \
-pa ${RABBITMQ_SERVER_CODE_PATH} ${RABBITMQ_EBIN_ROOT} \
${RABBITMQ_START_RABBIT} \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index a15f24e586..c00dee1913 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -137,6 +137,10 @@ if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" (
set RABBITMQ_IO_THREAD_POOL_SIZE=64
)
+rem Bump ETS table limit to 50000
+if "!ERL_MAX_ETS_TABLES!"=="" (
+ set ERL_MAX_ETS_TABLES=50000
+)
set ENV_OK=true
CALL :check_not_empty "RABBITMQ_BOOT_MODULE" !RABBITMQ_BOOT_MODULE!
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 347a5e62ae..92b3393670 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -225,6 +225,11 @@ if "!RABBITMQ_SERVICE_RESTART!"=="" (
set RABBITMQ_SERVICE_RESTART=restart
)
+rem Bump ETS table limit to 50000
+if "!ERL_MAX_ETS_TABLES!"=="" (
+ set ERL_MAX_ETS_TABLES=50000
+)
+
set ERLANG_SERVICE_ARGUMENTS= ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-boot start_sasl ^
@@ -262,6 +267,7 @@ set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:"=\"!
-machine "!ERLANG_SERVICE_MANAGER_PATH!\erl.exe" ^
-env ERL_CRASH_DUMP="!RABBITMQ_BASE:\=/!/erl_crash.dump" ^
-env ERL_LIBS="!ERL_LIBS!" ^
+-env ERL_MAX_ETS_TABLES="!ERL_MAX_ETS_TABLES!" ^
-workdir "!RABBITMQ_BASE!" ^
-stopaction "rabbit:stop_and_halt()." ^
!RABBITMQ_NAME_TYPE! !RABBITMQ_NODENAME! ^
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index babaf4fd4e..d96c1dd476 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -70,6 +70,10 @@
{clear_parameter, [?VHOST_DEF]},
{list_parameters, [?VHOST_DEF]},
+ set_global_parameter,
+ clear_global_parameter,
+ list_global_parameters,
+
{set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]},
{clear_policy, [?VHOST_DEF]},
{set_operator_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]},
@@ -126,7 +130,7 @@
-define(COMMANDS_WITH_TIMEOUT,
[list_user_permissions, list_policies, list_queues, list_exchanges,
list_bindings, list_connections, list_channels, list_consumers,
- list_vhosts, list_parameters,
+ list_vhosts, list_parameters, list_global_parameters,
purge_queue,
{node_health_check, 70000}]).
@@ -282,7 +286,7 @@ action(stop, Node, Args, _Opts, Inform) ->
Res;
action(stop_app, Node, [], _Opts, Inform) ->
- Inform("Stopping node ~p", [Node]),
+ Inform("Stopping rabbit application on node ~p", [Node]),
call(Node, {rabbit, stop, []});
action(start_app, Node, [], _Opts, Inform) ->
@@ -529,6 +533,20 @@ action(clear_parameter, Node, [Component, Key], Opts, Inform) ->
list_to_binary(Component),
list_to_binary(Key)]);
+action(set_global_parameter, Node, [Key, Value], _Opts, Inform) ->
+ Inform("Setting global runtime parameter ~p to ~p", [Key, Value]),
+ rpc_call(
+ Node, rabbit_runtime_parameters, parse_set_global,
+ [rabbit_data_coercion:to_atom(Key), rabbit_data_coercion:to_binary(Value)]
+ );
+
+action(clear_global_parameter, Node, [Key], _Opts, Inform) ->
+ Inform("Clearing global runtime parameter ~p", [Key]),
+ rpc_call(
+ Node, rabbit_runtime_parameters, clear_global,
+ [rabbit_data_coercion:to_atom(Key)]
+ );
+
action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) ->
Msg = "Setting policy ~p for pattern ~p to ~p with priority ~p",
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
@@ -659,6 +677,12 @@ action(list_parameters, Node, [], Opts, Inform, Timeout) ->
rabbit_runtime_parameters:info_keys(),
[{timeout, Timeout}]);
+action(list_global_parameters, Node, [], _Opts, Inform, Timeout) ->
+ Inform("Listing global runtime parameters", []),
+ call_emitter(Node, {rabbit_runtime_parameters, list_global_formatted, []},
+ rabbit_runtime_parameters:global_info_keys(),
+ [{timeout, Timeout}]);
+
action(list_policies, Node, [], Opts, Inform, Timeout) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing policies", []),
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 072a48be3d..94018a5b54 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -40,6 +40,8 @@
%% Parameters are stored in Mnesia and can be global. Their changes
%% are broadcasted over rabbit_event.
%%
+%% Global parameters keys are atoms and values are JSON documents.
+%%
%% See also:
%%
%% * rabbit_policies
@@ -53,7 +55,9 @@
list_component/1, list/2, list_formatted/1, list_formatted/3,
lookup/3, value/3, value/4, info_keys/0, clear_component/1]).
--export([set_global/2, value_global/1, value_global/2]).
+-export([parse_set_global/2, set_global/2, value_global/1, value_global/2,
+ list_global/0, list_global_formatted/0, list_global_formatted/2,
+ lookup_global/1, global_info_keys/0, clear_global/1]).
%%----------------------------------------------------------------------------
@@ -109,10 +113,19 @@ set(_, <<"policy">>, _, _, _) ->
set(VHost, Component, Name, Term, User) ->
set_any(VHost, Component, Name, Term, User).
-set_global(Name, Term) ->
- mnesia_update(Name, Term),
- event_notify(parameter_set, none, global, [{name, Name},
- {value, Term}]),
+parse_set_global(Name, String) ->
+ Definition = rabbit_data_coercion:to_binary(String),
+ case rabbit_json:try_decode(Definition) of
+ {ok, Term} when is_map(Term) -> set_global(Name, maps:to_list(Term));
+ {ok, Term} -> set_global(Name, Term);
+ error -> {error_string, "JSON decoding error"}
+ end.
+
+set_global(Name, Term) ->
+ NameAsAtom = rabbit_data_coercion:to_atom(Name),
+ mnesia_update(NameAsAtom, Term),
+ event_notify(parameter_set, none, global, [{name, NameAsAtom},
+ {value, Term}]),
ok.
format_error(L) ->
@@ -168,6 +181,26 @@ clear(_, <<"policy">> , _) ->
clear(VHost, Component, Name) ->
clear_any(VHost, Component, Name).
+clear_global(Key) ->
+ KeyAsAtom = rabbit_data_coercion:to_atom(Key),
+ Notify = fun() ->
+ event_notify(parameter_set, none, global, [{name, KeyAsAtom}]),
+ ok
+ end,
+ case value_global(KeyAsAtom) of
+ not_found ->
+ {error_string, "Parameter does not exist"};
+ _ ->
+ F = fun () ->
+ ok = mnesia:delete(?TABLE, KeyAsAtom, write)
+ end,
+ ok = rabbit_misc:execute_mnesia_transaction(F),
+ case mnesia:is_transaction() of
+ true -> Notify;
+ false -> Notify()
+ end
+ end.
+
clear_component(Component) ->
case rabbit_runtime_parameters:list_component(Component) of
[] ->
@@ -235,6 +268,15 @@ list(VHost, Component) ->
Comp =/= <<"policy">> orelse Component =:= <<"policy">>]
end).
+list_global() ->
+ %% list only atom keys
+ mnesia:async_dirty(
+ fun () ->
+ Match = #runtime_parameters{key = '_', _ = '_'},
+ [p(P) || P <- mnesia:match_object(?TABLE, Match, read),
+ is_atom(P#runtime_parameters.key)]
+ end).
+
list_formatted(VHost) ->
[pset(value, rabbit_json:encode(pget(value, P)), P) || P <- list(VHost)].
@@ -243,17 +285,34 @@ list_formatted(VHost, Ref, AggregatorPid) ->
AggregatorPid, Ref,
fun(P) -> pset(value, rabbit_json:encode(pget(value, P)), P) end, list(VHost)).
+list_global_formatted() ->
+ [pset(value, rabbit_json:encode(pget(value, P)), P) || P <- list_global()].
+
+list_global_formatted(Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map(
+ AggregatorPid, Ref,
+ fun(P) -> pset(value, rabbit_json:encode(pget(value, P)), P) end, list_global()).
+
lookup(VHost, Component, Name) ->
case lookup0({VHost, Component, Name}, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> p(Params)
end.
+lookup_global(Name) ->
+ case lookup0(Name, rabbit_misc:const(not_found)) of
+ not_found -> not_found;
+ Params -> p(Params)
+ end.
+
value(VHost, Comp, Name) -> value0({VHost, Comp, Name}).
value(VHost, Comp, Name, Def) -> value0({VHost, Comp, Name}, Def).
-value_global(Key) -> value0(Key).
-value_global(Key, Default) -> value0(Key, Default).
+value_global(Key) ->
+ value0(Key).
+
+value_global(Key, Default) ->
+ value0(Key, Default).
value0(Key) ->
case lookup0(Key, rabbit_misc:const(not_found)) of
@@ -290,10 +349,16 @@ p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) ->
[{vhost, VHost},
{component, Component},
{name, Name},
+ {value, Value}];
+
+p(#runtime_parameters{key = Key, value = Value}) when is_atom(Key) ->
+ [{name, Key},
{value, Value}].
info_keys() -> [component, name, value].
+global_info_keys() -> [name, value].
+
%%---------------------------------------------------------------------------
lookup_component(Component) ->
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index bd07f0d782..c515abe07f 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -27,6 +27,7 @@
run/1]).
-export([set_maximum_since_use/2]).
+-export([set_timeout/2, set_timeout/3, clear_timeout/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_cast/3]).
@@ -136,6 +137,11 @@ handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) ->
handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->
{noreply, State, hibernate};
+handle_info({timeout, Key, Fun}, State) ->
+ clear_timeout(Key),
+ Fun(),
+ {noreply, State, hibernate};
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -144,3 +150,44 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, State) ->
State.
+
+-spec set_timeout(integer(), fun(() -> any())) -> reference().
+set_timeout(Time, Fun) ->
+ Key = make_ref(),
+ set_timeout(Key, Time, Fun).
+
+-spec set_timeout(Key, integer(), fun(() -> any())) -> Key when Key :: any().
+set_timeout(Key, Time, Fun) ->
+ Timeouts = get_timeouts(),
+ set_timeout(Key, Time, Fun, Timeouts).
+
+-spec clear_timeout(any()) -> ok.
+clear_timeout(Key) ->
+ NewTimeouts = cancel_timeout(Key, get_timeouts()),
+ put(timeouts, NewTimeouts),
+ ok.
+
+get_timeouts() ->
+ case get(timeouts) of
+ undefined -> dict:new();
+ Dict -> Dict
+ end.
+
+set_timeout(Key, Time, Fun, Timeouts) ->
+ cancel_timeout(Key, Timeouts),
+ {ok, TRef} = timer:send_after(Time, {timeout, Key, Fun}),
+ NewTimeouts = dict:store(Key, TRef, Timeouts),
+ put(timeouts, NewTimeouts),
+ {ok, Key}.
+
+cancel_timeout(Key, Timeouts) ->
+ case dict:find(Key, Timeouts) of
+ {ok, TRef} ->
+ timer:cancel(TRef),
+ receive {timeout, Key, _} -> ok
+ after 0 -> ok
+ end,
+ dict:erase(Key, Timeouts);
+ error ->
+ Timeouts
+ end.
diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl
index 9305781bda..ef85472f48 100644
--- a/test/rabbitmqctl_integration_SUITE.erl
+++ b/test/rabbitmqctl_integration_SUITE.erl
@@ -31,17 +31,24 @@
-export([list_queues_local/1
,list_queues_offline/1
,list_queues_online/1
+ ,manage_global_parameters/1
]).
all() ->
- [{group, list_queues}].
+ [
+ {group, list_queues},
+ {group, global_parameters}
+ ].
groups() ->
- [{list_queues, [],
- [list_queues_local
- ,list_queues_online
- ,list_queues_offline
- ]}].
+ [
+ {list_queues, [],
+ [list_queues_local
+ ,list_queues_online
+ ,list_queues_offline
+ ]},
+ {global_parameters, [], [manage_global_parameters]}
+ ].
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
@@ -56,6 +63,13 @@ init_per_group(list_queues, Config0) ->
Config1 = declare_some_queues(Config),
rabbit_ct_broker_helpers:stop_node(Config1, NumNodes - 1),
Config1;
+init_per_group(global_parameters,Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
init_per_group(_, Config) ->
Config.
@@ -92,6 +106,10 @@ end_per_group(list_queues, Config0) ->
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps());
+end_per_group(global_parameters, Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
end_per_group(_, Config) ->
Config.
@@ -126,6 +144,75 @@ list_queues_offline(Config) ->
assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues),
ok.
+manage_global_parameters(Config) ->
+ 0 = length(global_parameters(Config)),
+ Parameter1Key = global_param1,
+ GlobalParameter1ValueAsString = "{\"a\":\"b\", \"c\":\"d\"}",
+ ok = control_action(Config, set_global_parameter,
+ [atom_to_list(Parameter1Key),
+ GlobalParameter1ValueAsString
+ ]),
+
+ 1 = length(global_parameters(Config)),
+
+ GlobalParameter1Value = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_runtime_parameters, value_global,
+ [Parameter1Key]
+ ),
+
+ [{<<"a">>,<<"b">>}, {<<"c">>,<<"d">>}] = GlobalParameter1Value,
+
+ Parameter2Key = global_param2,
+ GlobalParameter2ValueAsString = "{\"e\":\"f\", \"g\":\"h\"}",
+ ok = control_action(Config, set_global_parameter,
+ [atom_to_list(Parameter2Key),
+ GlobalParameter2ValueAsString
+ ]),
+
+ 2 = length(global_parameters(Config)),
+
+ GlobalParameter2Value = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_runtime_parameters, value_global,
+ [Parameter2Key]
+ ),
+
+ [{<<"e">>,<<"f">>}, {<<"g">>,<<"h">>}] = GlobalParameter2Value,
+
+
+ GlobalParameter1Value2AsString = "{\"a\":\"z\", \"c\":\"d\"}",
+ ok = control_action(Config, set_global_parameter,
+ [atom_to_list(Parameter1Key),
+ GlobalParameter1Value2AsString
+ ]),
+
+ 2 = length(global_parameters(Config)),
+
+ GlobalParameter1Value2 = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_runtime_parameters, value_global,
+ [Parameter1Key]
+ ),
+
+ [{<<"a">>,<<"z">>}, {<<"c">>,<<"d">>}] = GlobalParameter1Value2,
+
+ ok = control_action(Config, clear_global_parameter,
+ [atom_to_list(Parameter1Key)]
+ ),
+
+ 1 = length(global_parameters(Config)),
+
+ not_found = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_runtime_parameters, value_global,
+ [Parameter1Key]
+ ),
+
+ ok = control_action(Config, list_global_parameters, []),
+
+ ok.
+
%%----------------------------------------------------------------------------
%% Helpers
%%----------------------------------------------------------------------------
@@ -144,3 +231,17 @@ assert_ctl_queues(Config, Node, Args, Expected0) ->
run_list_queues(Config, Node, Args) ->
rabbit_ct_broker_helpers:rabbitmqctl_list(Config, Node, ["list_queues"] ++ Args ++ ["name"]).
+
+control_action(Config, Command, Args) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ rabbit_control_main:action(
+ Command, Node, Args, [],
+ fun (Format, Args1) ->
+ io:format(Format ++ " ...~n", Args1)
+ end).
+
+global_parameters(Config) ->
+ rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_runtime_parameters, list_global, []
+ ). \ No newline at end of file
diff --git a/test/worker_pool_SUITE.erl b/test/worker_pool_SUITE.erl
new file mode 100644
index 0000000000..7eb4d6fd04
--- /dev/null
+++ b/test/worker_pool_SUITE.erl
@@ -0,0 +1,193 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(worker_pool_SUITE).
+
+-compile(export_all).
+-include_lib("common_test/include/ct.hrl").
+
+
+-define(POOL_SIZE, 1).
+-define(POOL_NAME, test_pool).
+
+all() ->
+ [
+ run_code_synchronously,
+ run_code_asynchronously,
+ set_timeout,
+ cancel_timeout,
+ cancel_timeout_by_setting
+ ].
+
+init_per_testcase(_, Config) ->
+ {ok, Pool} = worker_pool_sup:start_link(?POOL_SIZE, ?POOL_NAME),
+ rabbit_ct_helpers:set_config(Config, [{pool_sup, Pool}]).
+
+end_per_testcase(_, Config) ->
+ Pool = ?config(pool_sup, Config),
+ unlink(Pool),
+ exit(Pool, kill).
+
+run_code_synchronously(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Sleep = 200,
+ {Time, Result} = timer:tc(fun() ->
+ worker_pool:submit(?POOL_NAME,
+ fun() ->
+ timer:sleep(Sleep),
+ Self ! {hi, Test},
+ self()
+ end,
+ reuse)
+ end),
+ % Worker run synchronously
+ true = Time > Sleep,
+ % Worker have sent message
+ receive {hi, Test} -> ok
+ after 0 -> error(no_message_from_worker)
+ end,
+ % Worker is a separate process
+ true = (Self /= Result).
+
+run_code_asynchronously(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Sleep = 200,
+ {Time, Result} = timer:tc(fun() ->
+ worker_pool:submit_async(?POOL_NAME,
+ fun() ->
+ timer:sleep(Sleep),
+ Self ! {hi, Test},
+ self()
+ end)
+ end),
+ % Worker run synchronously
+ true = Time < Sleep,
+ % Worker have sent message
+ receive {hi, Test} -> ok
+ after Sleep + 100 -> error(no_message_from_worker)
+ end,
+ % Worker is a separate process
+ true = (Self /= Result).
+
+set_timeout(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Worker = worker_pool:submit(?POOL_NAME,
+ fun() ->
+ Worker = self(),
+ timer:sleep(100),
+ worker_pool_worker:set_timeout(
+ my_timeout, 1000,
+ fun() ->
+ Self ! {hello, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ % Timeout will occur after 1000 ms only
+ receive {hello, Worker, Test} -> exit(timeout_should_wait)
+ after 0 -> ok
+ end,
+
+ timer:sleep(1000),
+
+ receive {hello, Worker, Test} -> ok
+ after 1000 -> exit(timeout_is_late)
+ end.
+
+
+cancel_timeout(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Worker = worker_pool:submit(?POOL_NAME,
+ fun() ->
+ Worker = self(),
+ timer:sleep(100),
+ worker_pool_worker:set_timeout(
+ my_timeout, 1000,
+ fun() ->
+ Self ! {hello, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ % Timeout will occur after 1000 ms only
+ receive {hello, Worker, Test} -> exit(timeout_should_wait)
+ after 0 -> ok
+ end,
+
+ worker_pool_worker:next_job_from(Worker, Self),
+ Worker = worker_pool_worker:submit(Worker,
+ fun() ->
+ worker_pool_worker:clear_timeout(my_timeout),
+ Worker
+ end,
+ reuse),
+
+ timer:sleep(1000),
+ receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld)
+ after 0 -> ok
+ end.
+
+cancel_timeout_by_setting(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Worker = worker_pool:submit(?POOL_NAME,
+ fun() ->
+ Worker = self(),
+ timer:sleep(100),
+ worker_pool_worker:set_timeout(
+ my_timeout, 1000,
+ fun() ->
+ Self ! {hello, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ % Timeout will occur after 1000 ms only
+ receive {hello, Worker, Test} -> exit(timeout_should_wait)
+ after 0 -> ok
+ end,
+
+ worker_pool_worker:next_job_from(Worker, Self),
+ Worker = worker_pool_worker:submit(Worker,
+ fun() ->
+ worker_pool_worker:set_timeout(my_timeout, 1000,
+ fun() ->
+ Self ! {hello_reset, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ timer:sleep(1000),
+ receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld)
+ after 0 -> ok
+ end,
+
+ receive {hello_reset, Worker, Test} -> ok
+ after 1000 -> exit(timeout_is_late)
+ end.
+
+
+
+
+