diff options
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 81 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 6 | ||||
| -rw-r--r-- | scripts/rabbitmq-server.bat | 4 | ||||
| -rw-r--r-- | scripts/rabbitmq-service.bat | 6 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_runtime_parameters.erl | 79 | ||||
| -rw-r--r-- | src/worker_pool_worker.erl | 47 | ||||
| -rw-r--r-- | test/rabbitmqctl_integration_SUITE.erl | 113 | ||||
| -rw-r--r-- | test/worker_pool_SUITE.erl | 193 |
9 files changed, 537 insertions, 20 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index fc4927ee1c..32dc43845a 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1030,11 +1030,16 @@ <para> Certain features of RabbitMQ (such as the federation plugin) are controlled by dynamic, - cluster-wide <emphasis>parameters</emphasis>. Each parameter - consists of a component name, a name and a value, and is - associated with a virtual host. The component name and name are - strings, and the value is an Erlang term. Parameters can be - set, cleared and listed. In general you should refer to the + cluster-wide <emphasis>parameters</emphasis>. + There are 2 kinds of parameters: parameters scoped to + a virtual host and global parameters. + Each vhost-scoped parameter + consists of a component name, a name and a value. + The component name and name are + strings, and the value is an Erlang term. + A global parameter consists of a name and value. The name + is a string and the value is an Erlang term. + Parameters can be set, cleared and listed. In general you should refer to the documentation for the feature in question to see how to set parameters. </para> @@ -1116,6 +1121,72 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>set_global_parameter</command> <arg choice="req"><replaceable>name</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Sets a global runtime parameter. This is similar to <command>set_parameter</command> + but the key-value pair isn't tied to a virtual host. + </para> + <variablelist> + <varlistentry> + <term>name</term> + <listitem><para> + The name of the global runtime parameter being set. + </para></listitem> + </varlistentry> + <varlistentry> + <term>value</term> + <listitem><para> + The value for the global runtime parameter, as a + JSON term. In most shells you are very likely to + need to quote this. + </para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl set_global_parameter mqtt_default_vhosts '{"O=client,CN=guest":"/"}'</screen> + <para role="example"> + This command sets the global runtime parameter <command>mqtt_default_vhosts</command> to the JSON term <command>{"O=client,CN=guest":"/"}</command>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>clear_global_parameter</command> <arg choice="req"><replaceable>name</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Clears a global runtime parameter. This is similar to <command>clear_global_parameter</command> + but the key-value pair isn't tied to a virtual host. + </para> + <variablelist> + <varlistentry> + <term>name</term> + <listitem><para> + The name of the global runtime parameter being cleared. + </para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl clear_global_parameter mqtt_default_vhosts</screen> + <para role="example"> + This command clears the global runtime parameter <command>mqtt_default_vhosts</command>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>list_global_parameters</command></cmdsynopsis></term> + <listitem> + <para> + Lists all global runtime parameters. This is similar to <command>list_parameters</command> + but the global runtime parameters are not tied to any virtual host. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl list_global_parameters</screen> + <para role="example"> + This command lists all global parameters. + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 33369616ef..cd56b67d73 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -163,6 +163,11 @@ else RABBITMQ_LAGER_HANDLER_UPGRADE='"'${RABBITMQ_UPGRADE_LOG}'"' fi +# Bump ETS table limit to 50000 +if ["x" = "x$ERL_MAX_ETS_TABLES"]; then + ERL_MAX_ETS_TABLES=50000 +fi + # we need to turn off path expansion because some of the vars, notably # RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and # there is no other way of preventing their expansion. @@ -191,6 +196,7 @@ start_rabbitmq_server() { ensure_thread_pool_size check_start_params && RABBITMQ_CONFIG_FILE=$RABBITMQ_CONFIG_FILE \ + ERL_MAX_ETS_TABLES=$ERL_MAX_ETS_TABLES \ exec ${ERL_DIR}erl \ -pa ${RABBITMQ_SERVER_CODE_PATH} ${RABBITMQ_EBIN_ROOT} \ ${RABBITMQ_START_RABBIT} \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index a15f24e586..c00dee1913 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -137,6 +137,10 @@ if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" ( set RABBITMQ_IO_THREAD_POOL_SIZE=64
)
+rem Bump ETS table limit to 50000
+if "!ERL_MAX_ETS_TABLES!"=="" (
+ set ERL_MAX_ETS_TABLES=50000
+)
set ENV_OK=true
CALL :check_not_empty "RABBITMQ_BOOT_MODULE" !RABBITMQ_BOOT_MODULE!
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 347a5e62ae..92b3393670 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -225,6 +225,11 @@ if "!RABBITMQ_SERVICE_RESTART!"=="" ( set RABBITMQ_SERVICE_RESTART=restart
)
+rem Bump ETS table limit to 50000
+if "!ERL_MAX_ETS_TABLES!"=="" (
+ set ERL_MAX_ETS_TABLES=50000
+)
+
set ERLANG_SERVICE_ARGUMENTS= ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-boot start_sasl ^
@@ -262,6 +267,7 @@ set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:"=\"! -machine "!ERLANG_SERVICE_MANAGER_PATH!\erl.exe" ^
-env ERL_CRASH_DUMP="!RABBITMQ_BASE:\=/!/erl_crash.dump" ^
-env ERL_LIBS="!ERL_LIBS!" ^
+-env ERL_MAX_ETS_TABLES="!ERL_MAX_ETS_TABLES!" ^
-workdir "!RABBITMQ_BASE!" ^
-stopaction "rabbit:stop_and_halt()." ^
!RABBITMQ_NAME_TYPE! !RABBITMQ_NODENAME! ^
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index babaf4fd4e..d96c1dd476 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -70,6 +70,10 @@ {clear_parameter, [?VHOST_DEF]}, {list_parameters, [?VHOST_DEF]}, + set_global_parameter, + clear_global_parameter, + list_global_parameters, + {set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]}, {clear_policy, [?VHOST_DEF]}, {set_operator_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]}, @@ -126,7 +130,7 @@ -define(COMMANDS_WITH_TIMEOUT, [list_user_permissions, list_policies, list_queues, list_exchanges, list_bindings, list_connections, list_channels, list_consumers, - list_vhosts, list_parameters, + list_vhosts, list_parameters, list_global_parameters, purge_queue, {node_health_check, 70000}]). @@ -282,7 +286,7 @@ action(stop, Node, Args, _Opts, Inform) -> Res; action(stop_app, Node, [], _Opts, Inform) -> - Inform("Stopping node ~p", [Node]), + Inform("Stopping rabbit application on node ~p", [Node]), call(Node, {rabbit, stop, []}); action(start_app, Node, [], _Opts, Inform) -> @@ -529,6 +533,20 @@ action(clear_parameter, Node, [Component, Key], Opts, Inform) -> list_to_binary(Component), list_to_binary(Key)]); +action(set_global_parameter, Node, [Key, Value], _Opts, Inform) -> + Inform("Setting global runtime parameter ~p to ~p", [Key, Value]), + rpc_call( + Node, rabbit_runtime_parameters, parse_set_global, + [rabbit_data_coercion:to_atom(Key), rabbit_data_coercion:to_binary(Value)] + ); + +action(clear_global_parameter, Node, [Key], _Opts, Inform) -> + Inform("Clearing global runtime parameter ~p", [Key]), + rpc_call( + Node, rabbit_runtime_parameters, clear_global, + [rabbit_data_coercion:to_atom(Key)] + ); + action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) -> Msg = "Setting policy ~p for pattern ~p to ~p with priority ~p", VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), @@ -659,6 +677,12 @@ action(list_parameters, Node, [], Opts, Inform, Timeout) -> rabbit_runtime_parameters:info_keys(), [{timeout, Timeout}]); +action(list_global_parameters, Node, [], _Opts, Inform, Timeout) -> + Inform("Listing global runtime parameters", []), + call_emitter(Node, {rabbit_runtime_parameters, list_global_formatted, []}, + rabbit_runtime_parameters:global_info_keys(), + [{timeout, Timeout}]); + action(list_policies, Node, [], Opts, Inform, Timeout) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Listing policies", []), diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index 072a48be3d..94018a5b54 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -40,6 +40,8 @@ %% Parameters are stored in Mnesia and can be global. Their changes %% are broadcasted over rabbit_event. %% +%% Global parameters keys are atoms and values are JSON documents. +%% %% See also: %% %% * rabbit_policies @@ -53,7 +55,9 @@ list_component/1, list/2, list_formatted/1, list_formatted/3, lookup/3, value/3, value/4, info_keys/0, clear_component/1]). --export([set_global/2, value_global/1, value_global/2]). +-export([parse_set_global/2, set_global/2, value_global/1, value_global/2, + list_global/0, list_global_formatted/0, list_global_formatted/2, + lookup_global/1, global_info_keys/0, clear_global/1]). %%---------------------------------------------------------------------------- @@ -109,10 +113,19 @@ set(_, <<"policy">>, _, _, _) -> set(VHost, Component, Name, Term, User) -> set_any(VHost, Component, Name, Term, User). -set_global(Name, Term) -> - mnesia_update(Name, Term), - event_notify(parameter_set, none, global, [{name, Name}, - {value, Term}]), +parse_set_global(Name, String) -> + Definition = rabbit_data_coercion:to_binary(String), + case rabbit_json:try_decode(Definition) of + {ok, Term} when is_map(Term) -> set_global(Name, maps:to_list(Term)); + {ok, Term} -> set_global(Name, Term); + error -> {error_string, "JSON decoding error"} + end. + +set_global(Name, Term) -> + NameAsAtom = rabbit_data_coercion:to_atom(Name), + mnesia_update(NameAsAtom, Term), + event_notify(parameter_set, none, global, [{name, NameAsAtom}, + {value, Term}]), ok. format_error(L) -> @@ -168,6 +181,26 @@ clear(_, <<"policy">> , _) -> clear(VHost, Component, Name) -> clear_any(VHost, Component, Name). +clear_global(Key) -> + KeyAsAtom = rabbit_data_coercion:to_atom(Key), + Notify = fun() -> + event_notify(parameter_set, none, global, [{name, KeyAsAtom}]), + ok + end, + case value_global(KeyAsAtom) of + not_found -> + {error_string, "Parameter does not exist"}; + _ -> + F = fun () -> + ok = mnesia:delete(?TABLE, KeyAsAtom, write) + end, + ok = rabbit_misc:execute_mnesia_transaction(F), + case mnesia:is_transaction() of + true -> Notify; + false -> Notify() + end + end. + clear_component(Component) -> case rabbit_runtime_parameters:list_component(Component) of [] -> @@ -235,6 +268,15 @@ list(VHost, Component) -> Comp =/= <<"policy">> orelse Component =:= <<"policy">>] end). +list_global() -> + %% list only atom keys + mnesia:async_dirty( + fun () -> + Match = #runtime_parameters{key = '_', _ = '_'}, + [p(P) || P <- mnesia:match_object(?TABLE, Match, read), + is_atom(P#runtime_parameters.key)] + end). + list_formatted(VHost) -> [pset(value, rabbit_json:encode(pget(value, P)), P) || P <- list(VHost)]. @@ -243,17 +285,34 @@ list_formatted(VHost, Ref, AggregatorPid) -> AggregatorPid, Ref, fun(P) -> pset(value, rabbit_json:encode(pget(value, P)), P) end, list(VHost)). +list_global_formatted() -> + [pset(value, rabbit_json:encode(pget(value, P)), P) || P <- list_global()]. + +list_global_formatted(Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map( + AggregatorPid, Ref, + fun(P) -> pset(value, rabbit_json:encode(pget(value, P)), P) end, list_global()). + lookup(VHost, Component, Name) -> case lookup0({VHost, Component, Name}, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> p(Params) end. +lookup_global(Name) -> + case lookup0(Name, rabbit_misc:const(not_found)) of + not_found -> not_found; + Params -> p(Params) + end. + value(VHost, Comp, Name) -> value0({VHost, Comp, Name}). value(VHost, Comp, Name, Def) -> value0({VHost, Comp, Name}, Def). -value_global(Key) -> value0(Key). -value_global(Key, Default) -> value0(Key, Default). +value_global(Key) -> + value0(Key). + +value_global(Key, Default) -> + value0(Key, Default). value0(Key) -> case lookup0(Key, rabbit_misc:const(not_found)) of @@ -290,10 +349,16 @@ p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) -> [{vhost, VHost}, {component, Component}, {name, Name}, + {value, Value}]; + +p(#runtime_parameters{key = Key, value = Value}) when is_atom(Key) -> + [{name, Key}, {value, Value}]. info_keys() -> [component, name, value]. +global_info_keys() -> [name, value]. + %%--------------------------------------------------------------------------- lookup_component(Component) -> diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index bd07f0d782..c515abe07f 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -27,6 +27,7 @@ run/1]). -export([set_maximum_since_use/2]). +-export([set_timeout/2, set_timeout/3, clear_timeout/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_cast/3]). @@ -136,6 +137,11 @@ handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) -> handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) -> {noreply, State, hibernate}; +handle_info({timeout, Key, Fun}, State) -> + clear_timeout(Key), + Fun(), + {noreply, State, hibernate}; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -144,3 +150,44 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, State) -> State. + +-spec set_timeout(integer(), fun(() -> any())) -> reference(). +set_timeout(Time, Fun) -> + Key = make_ref(), + set_timeout(Key, Time, Fun). + +-spec set_timeout(Key, integer(), fun(() -> any())) -> Key when Key :: any(). +set_timeout(Key, Time, Fun) -> + Timeouts = get_timeouts(), + set_timeout(Key, Time, Fun, Timeouts). + +-spec clear_timeout(any()) -> ok. +clear_timeout(Key) -> + NewTimeouts = cancel_timeout(Key, get_timeouts()), + put(timeouts, NewTimeouts), + ok. + +get_timeouts() -> + case get(timeouts) of + undefined -> dict:new(); + Dict -> Dict + end. + +set_timeout(Key, Time, Fun, Timeouts) -> + cancel_timeout(Key, Timeouts), + {ok, TRef} = timer:send_after(Time, {timeout, Key, Fun}), + NewTimeouts = dict:store(Key, TRef, Timeouts), + put(timeouts, NewTimeouts), + {ok, Key}. + +cancel_timeout(Key, Timeouts) -> + case dict:find(Key, Timeouts) of + {ok, TRef} -> + timer:cancel(TRef), + receive {timeout, Key, _} -> ok + after 0 -> ok + end, + dict:erase(Key, Timeouts); + error -> + Timeouts + end. diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl index 9305781bda..ef85472f48 100644 --- a/test/rabbitmqctl_integration_SUITE.erl +++ b/test/rabbitmqctl_integration_SUITE.erl @@ -31,17 +31,24 @@ -export([list_queues_local/1 ,list_queues_offline/1 ,list_queues_online/1 + ,manage_global_parameters/1 ]). all() -> - [{group, list_queues}]. + [ + {group, list_queues}, + {group, global_parameters} + ]. groups() -> - [{list_queues, [], - [list_queues_local - ,list_queues_online - ,list_queues_offline - ]}]. + [ + {list_queues, [], + [list_queues_local + ,list_queues_online + ,list_queues_offline + ]}, + {global_parameters, [], [manage_global_parameters]} + ]. init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), @@ -56,6 +63,13 @@ init_per_group(list_queues, Config0) -> Config1 = declare_some_queues(Config), rabbit_ct_broker_helpers:stop_node(Config1, NumNodes - 1), Config1; +init_per_group(global_parameters,Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); init_per_group(_, Config) -> Config. @@ -92,6 +106,10 @@ end_per_group(list_queues, Config0) -> rabbit_ct_helpers:run_steps(Config1, rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()); +end_per_group(global_parameters, Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()); end_per_group(_, Config) -> Config. @@ -126,6 +144,75 @@ list_queues_offline(Config) -> assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues), ok. +manage_global_parameters(Config) -> + 0 = length(global_parameters(Config)), + Parameter1Key = global_param1, + GlobalParameter1ValueAsString = "{\"a\":\"b\", \"c\":\"d\"}", + ok = control_action(Config, set_global_parameter, + [atom_to_list(Parameter1Key), + GlobalParameter1ValueAsString + ]), + + 1 = length(global_parameters(Config)), + + GlobalParameter1Value = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_runtime_parameters, value_global, + [Parameter1Key] + ), + + [{<<"a">>,<<"b">>}, {<<"c">>,<<"d">>}] = GlobalParameter1Value, + + Parameter2Key = global_param2, + GlobalParameter2ValueAsString = "{\"e\":\"f\", \"g\":\"h\"}", + ok = control_action(Config, set_global_parameter, + [atom_to_list(Parameter2Key), + GlobalParameter2ValueAsString + ]), + + 2 = length(global_parameters(Config)), + + GlobalParameter2Value = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_runtime_parameters, value_global, + [Parameter2Key] + ), + + [{<<"e">>,<<"f">>}, {<<"g">>,<<"h">>}] = GlobalParameter2Value, + + + GlobalParameter1Value2AsString = "{\"a\":\"z\", \"c\":\"d\"}", + ok = control_action(Config, set_global_parameter, + [atom_to_list(Parameter1Key), + GlobalParameter1Value2AsString + ]), + + 2 = length(global_parameters(Config)), + + GlobalParameter1Value2 = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_runtime_parameters, value_global, + [Parameter1Key] + ), + + [{<<"a">>,<<"z">>}, {<<"c">>,<<"d">>}] = GlobalParameter1Value2, + + ok = control_action(Config, clear_global_parameter, + [atom_to_list(Parameter1Key)] + ), + + 1 = length(global_parameters(Config)), + + not_found = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_runtime_parameters, value_global, + [Parameter1Key] + ), + + ok = control_action(Config, list_global_parameters, []), + + ok. + %%---------------------------------------------------------------------------- %% Helpers %%---------------------------------------------------------------------------- @@ -144,3 +231,17 @@ assert_ctl_queues(Config, Node, Args, Expected0) -> run_list_queues(Config, Node, Args) -> rabbit_ct_broker_helpers:rabbitmqctl_list(Config, Node, ["list_queues"] ++ Args ++ ["name"]). + +control_action(Config, Command, Args) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + rabbit_control_main:action( + Command, Node, Args, [], + fun (Format, Args1) -> + io:format(Format ++ " ...~n", Args1) + end). + +global_parameters(Config) -> + rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_runtime_parameters, list_global, [] + ).
\ No newline at end of file diff --git a/test/worker_pool_SUITE.erl b/test/worker_pool_SUITE.erl new file mode 100644 index 0000000000..7eb4d6fd04 --- /dev/null +++ b/test/worker_pool_SUITE.erl @@ -0,0 +1,193 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(worker_pool_SUITE). + +-compile(export_all). +-include_lib("common_test/include/ct.hrl"). + + +-define(POOL_SIZE, 1). +-define(POOL_NAME, test_pool). + +all() -> + [ + run_code_synchronously, + run_code_asynchronously, + set_timeout, + cancel_timeout, + cancel_timeout_by_setting + ]. + +init_per_testcase(_, Config) -> + {ok, Pool} = worker_pool_sup:start_link(?POOL_SIZE, ?POOL_NAME), + rabbit_ct_helpers:set_config(Config, [{pool_sup, Pool}]). + +end_per_testcase(_, Config) -> + Pool = ?config(pool_sup, Config), + unlink(Pool), + exit(Pool, kill). + +run_code_synchronously(Config) -> + Self = self(), + Test = make_ref(), + Sleep = 200, + {Time, Result} = timer:tc(fun() -> + worker_pool:submit(?POOL_NAME, + fun() -> + timer:sleep(Sleep), + Self ! {hi, Test}, + self() + end, + reuse) + end), + % Worker run synchronously + true = Time > Sleep, + % Worker have sent message + receive {hi, Test} -> ok + after 0 -> error(no_message_from_worker) + end, + % Worker is a separate process + true = (Self /= Result). + +run_code_asynchronously(Config) -> + Self = self(), + Test = make_ref(), + Sleep = 200, + {Time, Result} = timer:tc(fun() -> + worker_pool:submit_async(?POOL_NAME, + fun() -> + timer:sleep(Sleep), + Self ! {hi, Test}, + self() + end) + end), + % Worker run synchronously + true = Time < Sleep, + % Worker have sent message + receive {hi, Test} -> ok + after Sleep + 100 -> error(no_message_from_worker) + end, + % Worker is a separate process + true = (Self /= Result). + +set_timeout(Config) -> + Self = self(), + Test = make_ref(), + Worker = worker_pool:submit(?POOL_NAME, + fun() -> + Worker = self(), + timer:sleep(100), + worker_pool_worker:set_timeout( + my_timeout, 1000, + fun() -> + Self ! {hello, self(), Test} + end), + Worker + end, + reuse), + + % Timeout will occur after 1000 ms only + receive {hello, Worker, Test} -> exit(timeout_should_wait) + after 0 -> ok + end, + + timer:sleep(1000), + + receive {hello, Worker, Test} -> ok + after 1000 -> exit(timeout_is_late) + end. + + +cancel_timeout(Config) -> + Self = self(), + Test = make_ref(), + Worker = worker_pool:submit(?POOL_NAME, + fun() -> + Worker = self(), + timer:sleep(100), + worker_pool_worker:set_timeout( + my_timeout, 1000, + fun() -> + Self ! {hello, self(), Test} + end), + Worker + end, + reuse), + + % Timeout will occur after 1000 ms only + receive {hello, Worker, Test} -> exit(timeout_should_wait) + after 0 -> ok + end, + + worker_pool_worker:next_job_from(Worker, Self), + Worker = worker_pool_worker:submit(Worker, + fun() -> + worker_pool_worker:clear_timeout(my_timeout), + Worker + end, + reuse), + + timer:sleep(1000), + receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld) + after 0 -> ok + end. + +cancel_timeout_by_setting(Config) -> + Self = self(), + Test = make_ref(), + Worker = worker_pool:submit(?POOL_NAME, + fun() -> + Worker = self(), + timer:sleep(100), + worker_pool_worker:set_timeout( + my_timeout, 1000, + fun() -> + Self ! {hello, self(), Test} + end), + Worker + end, + reuse), + + % Timeout will occur after 1000 ms only + receive {hello, Worker, Test} -> exit(timeout_should_wait) + after 0 -> ok + end, + + worker_pool_worker:next_job_from(Worker, Self), + Worker = worker_pool_worker:submit(Worker, + fun() -> + worker_pool_worker:set_timeout(my_timeout, 1000, + fun() -> + Self ! {hello_reset, self(), Test} + end), + Worker + end, + reuse), + + timer:sleep(1000), + receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld) + after 0 -> ok + end, + + receive {hello_reset, Worker, Test} -> ok + after 1000 -> exit(timeout_is_late) + end. + + + + + |
