diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2016-09-15 17:05:23 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2016-09-15 17:05:23 +0300 |
| commit | 0e60c60954378b4339ed77d1e5b71e07f0bac002 (patch) | |
| tree | 1ee03ed1fddf3c726422649b3d4b9a8233347664 | |
| parent | 6577b1041b5e3f0b4d8bf89593afdf3c5d23017a (diff) | |
| parent | 1b0096a925ba56af16d4776713a5c8e9593c587a (diff) | |
| download | rabbitmq-server-git-0e60c60954378b4339ed77d1e5b71e07f0bac002.tar.gz | |
Merge branch 'master' into rabbitmq-server-501
| -rwxr-xr-x | scripts/rabbitmq-env | 7 | ||||
| -rw-r--r-- | scripts/rabbitmq-env.bat | 10 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 3 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server-ha.ocf | 14 | ||||
| -rw-r--r-- | src/gm.erl | 7 | ||||
| -rw-r--r-- | src/rabbit.app.src | 3 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking_handler.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 73 | ||||
| -rw-r--r-- | test/per_vhost_connection_limit_SUITE.erl | 82 |
10 files changed, 151 insertions, 63 deletions
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index 1b9c6df4b3..206bdd0c20 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -65,8 +65,11 @@ RABBITMQ_HOME="$(rmq_realpath "${RABBITMQ_SCRIPTS_DIR}/..")" DEFAULT_SCHEDULER_BIND_TYPE="db" [ "x" = "x$RABBITMQ_SCHEDULER_BIND_TYPE" ] && RABBITMQ_SCHEDULER_BIND_TYPE=${DEFAULT_SCHEDULER_BIND_TYPE} +DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000 +[ "x" = "x$RABBITMQ_DISTRIBUTION_BUFFER_SIZE" ] && RABBITMQ_DISTRIBUTION_BUFFER_SIZE=${DEFAULT_DISTRIBUTION_BUFFER_SIZE} + ## Common defaults -SERVER_ERL_ARGS="+P 1048576 +t 5000000 +stbt $RABBITMQ_SCHEDULER_BIND_TYPE " +SERVER_ERL_ARGS="+P 1048576 +t 5000000 +stbt $RABBITMQ_SCHEDULER_BIND_TYPE +zdbbl $RABBITMQ_DISTRIBUTION_BUFFER_SIZE" # We save the current value of $RABBITMQ_PID_FILE in case it was set by # an init script. If $CONF_ENV_FILE overrides it again, we must ignore @@ -186,6 +189,8 @@ DEFAULT_NODE_PORT=5672 [ "x" = "x$RABBITMQ_GENERATED_CONFIG_DIR" ] && RABBITMQ_GENERATED_CONFIG_DIR=${GENERATED_CONFIG_DIR} [ "x" = "x$RABBITMQ_ADVANCED_CONFIG_FILE" ] && RABBITMQ_ADVANCED_CONFIG_FILE=${ADVANCED_CONFIG_FILE} [ "x" = "x$RABBITMQ_SCHEMA_DIR" ] && RABBITMQ_SCHEMA_DIR=${SCHEMA_DIR} +[ "x" = "x$RABBITMQ_IGNORE_SIGINT" ] && RABBITMQ_IGNORE_SIGINT="true" +[ "xtrue" = "x$RABBITMQ_IGNORE_SIGINT" ] && RABBITMQ_IGNORE_SIGINT_FLAG="+B i" rmq_normalize_path_var \ RABBITMQ_CONFIG_FILE \ diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat index a1576e9b27..56b2f69b2d 100644 --- a/scripts/rabbitmq-env.bat +++ b/scripts/rabbitmq-env.bat @@ -38,6 +38,14 @@ if "!RABBITMQ_SCHEDULER_BIND_TYPE!"=="" ( set RABBITMQ_SCHEDULER_BIND_TYPE=!DEFAULT_SCHEDULER_BIND_TYPE!
)
+REM DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000
+REM set the VM distribution buffer size
+REM [ "x" = "x$RABBITMQ_DISTRIBUTION_BUFFER_SIZE" ] && RABBITMQ_DISTRIBUTION_BUFFER_SIZE=${DEFAULT_DISTRIBUTION_BUFFER_SIZE}
+set DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000
+if "!RABBITMQ_DISTRIBUTION_BUFFER_SIZE!"=="" (
+ set RABBITMQ_DISTRIBUTION_BUFFER_SIZE=!DEFAULT_DISTRIBUTION_BUFFER_SIZE!
+)
+
REM # warn about old rabbitmq.conf file, if no new one
REM if [ -f /etc/rabbitmq/rabbitmq.conf ] && \
REM [ ! -f ${CONF_ENV_FILE} ] ; then
@@ -46,7 +54,7 @@ REM echo "location has moved to ${CONF_ENV_FILE}" REM fi
REM Common defaults
-set SERVER_ERL_ARGS=+P 1048576 +t 5000000 +stbt !RABBITMQ_SCHEDULER_BIND_TYPE!
+set SERVER_ERL_ARGS=+P 1048576 +t 5000000 +stbt !RABBITMQ_SCHEDULER_BIND_TYPE! +zdbbl !RABBITMQ_DISTRIBUTION_BUFFER_SIZE!
REM ## Get configuration variables from the configure environment file
REM [ -f ${CONF_ENV_FILE} ] && . ${CONF_ENV_FILE} || true
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index a0500ebb87..48365252e5 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -262,9 +262,8 @@ else # When RabbitMQ runs in the foreground but the Erlang shell is # disabled, we setup signal handlers to stop RabbitMQ properly. This # is at least useful in the case of Docker. - # The Erlang VM should ignore SIGINT. - RABBITMQ_SERVER_START_ARGS="${RABBITMQ_SERVER_START_ARGS} +B i" + RABBITMQ_SERVER_START_ARGS="${RABBITMQ_SERVER_START_ARGS} ${RABBITMQ_IGNORE_SIGINT_FLAG}" # Signal handlers. They all stop RabbitMQ properly (using # rabbitmqctl stop). Depending on the signal, this script will exit diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf index 63e5f700d9..8f711b3473 100755 --- a/scripts/rabbitmq-server-ha.ocf +++ b/scripts/rabbitmq-server-ha.ocf @@ -39,6 +39,7 @@ OCF_RESKEY_definitions_dump_file_default="/etc/rabbitmq/definitions" OCF_RESKEY_pid_file_default="/var/run/rabbitmq/pid" OCF_RESKEY_log_dir_default="/var/log/rabbitmq" OCF_RESKEY_mnesia_base_default="/var/lib/rabbitmq/mnesia" +OCF_RESKEY_mnesia_schema_base_default="/var/lib/rabbitmq" OCF_RESKEY_host_ip_default="127.0.0.1" OCF_RESKEY_node_port_default=5672 OCF_RESKEY_erlang_cookie_default=false @@ -62,6 +63,7 @@ OCF_RESKEY_rmq_feature_local_list_queues_default=true : ${OCF_RESKEY_definitions_dump_file=${OCF_RESKEY_definitions_dump_file_default}} : ${OCF_RESKEY_log_dir=${OCF_RESKEY_log_dir_default}} : ${OCF_RESKEY_mnesia_base=${OCF_RESKEY_mnesia_base_default}} +: ${OCF_RESKEY_mnesia_schema_base=${OCF_RESKEY_mnesia_schema_base_default}} : ${OCF_RESKEY_pid_file=${OCF_RESKEY_pid_file_default}} : ${OCF_RESKEY_node_port=${OCF_RESKEY_node_port_default}} : ${OCF_RESKEY_erlang_cookie=${OCF_RESKEY_erlang_cookie_default}} @@ -234,6 +236,14 @@ Base directory for storing Mnesia files <content type="boolean" default="${OCF_RESKEY_mnesia_base_default}" /> </parameter> +<parameter name="mnesia_schema_base" unique="0" required="0"> +<longdesc lang="en"> +Parent directory for Mnesia schema directory +</longdesc> +<shortdesc lang="en">Parent directory for Mnesia schema directory</shortdesc> +<content type="string" default="${OCF_RESKEY_mnesia_schema_base_default}" /> +</parameter> + <parameter name="host_ip" unique="0" required="0"> <longdesc lang="en"> ${OCF_RESKEY_binary} should listen on this IP address @@ -711,7 +721,9 @@ reset_mnesia() { if $make_amnesia ; then kill_rmq_and_remove_pid ocf_run rm -rf "${MNESIA_FILES}" - ocf_log warn "${LH} Mnesia files appear corrupted and have been removed from ${MNESIA_FILES}." + mnesia_schema_location="${OCF_RESKEY_mnesia_schema_base}/Mnesia.$(rabbit_node_name $(get_hostname))" + ocf_run rm -rf "$mnesia_schema_location" + ocf_log warn "${LH} Mnesia files appear corrupted and have been removed from ${MNESIA_FILES} and $mnesia_schema_location" fi # always return OCF SUCCESS return $OCF_SUCCESS diff --git a/src/gm.erl b/src/gm.erl index 0b5c1c44c4..aef23c7269 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -1137,7 +1137,7 @@ record_dead_member_in_group(Self, Member, GroupName, TxnFun, Verify) -> true -> check_membership(Self, read_group(GroupName)); false -> - read_group(GroupName) + check_group(read_group(GroupName)) end, case lists:splitwith( fun (Member1) -> Member1 =/= Member end, Members) of @@ -1615,3 +1615,8 @@ check_membership(GroupName) -> {error, not_found} -> throw(lost_membership) end. + +check_group({error, not_found}) -> + throw(lost_membership); +check_group(Any) -> + Any. diff --git a/src/rabbit.app.src b/src/rabbit.app.src index 738a38e2bb..2e17bbbc3c 100644 --- a/src/rabbit.app.src +++ b/src/rabbit.app.src @@ -98,7 +98,8 @@ {msg_store_credit_disc_bound, {2000, 500}}, {msg_store_io_batch_size, 2048}, %% see rabbitmq-server#143 - {credit_flow_default_credit, {200, 50}}, + %% and rabbitmq-server#949 + {credit_flow_default_credit, {200, 100}}, %% see rabbitmq-server#248 %% and rabbitmq-server#667 {channel_operation_timeout, 15000} diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index ab945abc2f..b1d34ca516 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -315,6 +315,7 @@ tracked_connection_from_connection_created(EventDetails) -> username = pget(user, EventDetails), connected_at = pget(connected_at, EventDetails), pid = pget(pid, EventDetails), + type = pget(type, EventDetails), peer_host = pget(peer_host, EventDetails), peer_port = pget(peer_port, EventDetails)}. @@ -333,5 +334,6 @@ tracked_connection_from_connection_state(#connection{ {user, Username}, {connected_at, Ts}, {pid, self()}, + {type, network}, {peer_port, PeerPort}, {peer_host, PeerHost}]). diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index fd1df8c88a..598fe686c3 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -77,9 +77,9 @@ handle_event(#event{type = connection_closed, props = Details}, State) -> handle_event(#event{type = vhost_deleted, props = Details}, State) -> VHost = pget(name, Details), rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]), - [rabbit_networking:close_connection(Pid, rabbit_misc:format("vhost '~s' is deleted", [VHost])) || - #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list(VHost)], - {ok, State}; + [close_connection(Conn, rabbit_misc:format("vhost '~s' is deleted", [VHost])) + || Conn <- rabbit_connection_tracking:list(VHost)], + {ok, State}; handle_event(#event{type = user_deleted, props = Details}, State) -> _Username = pget(name, Details), %% TODO: force close and unregister connections from @@ -106,3 +106,10 @@ terminate(_Arg, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +close_connection(#tracked_connection{pid = Pid, type = network}, Message) -> + rabbit_networking:close_connection(Pid, Message); +close_connection(#tracked_connection{pid = Pid, type = direct}, Message) -> + %% Do an RPC call to the node running the direct client. + Node = node(Pid), + rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 061105c150..858681ecfd 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -65,35 +65,64 @@ list() -> %%---------------------------------------------------------------------------- -connect({none, _}, VHost, Protocol, Pid, Infos) -> - connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end, - VHost, Protocol, Pid, Infos); +auth_fun({none, _}, _VHost) -> + fun () -> {ok, rabbit_auth_backend_dummy:user()} end; -connect({Username, none}, VHost, Protocol, Pid, Infos) -> - connect0(fun () -> rabbit_access_control:check_user_login(Username, []) end, - VHost, Protocol, Pid, Infos); +auth_fun({Username, none}, _VHost) -> + fun () -> rabbit_access_control:check_user_login(Username, []) end; -connect({Username, Password}, VHost, Protocol, Pid, Infos) -> - connect0(fun () -> rabbit_access_control:check_user_login( - Username, [{password, Password}, {vhost, VHost}]) end, - VHost, Protocol, Pid, Infos). +auth_fun({Username, Password}, VHost) -> + fun () -> + rabbit_access_control:check_user_login( + Username, + [{password, Password}, {vhost, VHost}]) + end. -connect0(AuthFun, VHost, Protocol, Pid, Infos) -> +connect(Creds, VHost, Protocol, Pid, Infos) -> + AuthFun = auth_fun(Creds, VHost), case rabbit:is_running() of - true -> case AuthFun() of - {ok, User = #user{username = Username}} -> - notify_auth_result(Username, - user_authentication_success, []), - connect1(User, VHost, Protocol, Pid, Infos); - {refused, Username, Msg, Args} -> - notify_auth_result(Username, - user_authentication_failure, - [{error, rabbit_misc:format(Msg, Args)}]), - {error, {auth_failure, "Refused"}} - end; + true -> + case is_over_connection_limit(VHost, Creds, Pid) of + true -> + {error, not_allowed}; + false -> + case AuthFun() of + {ok, User = #user{username = Username}} -> + notify_auth_result(Username, + user_authentication_success, []), + connect1(User, VHost, Protocol, Pid, Infos); + {refused, Username, Msg, Args} -> + notify_auth_result(Username, + user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}]), + {error, {auth_failure, "Refused"}} + end + end; false -> {error, broker_not_found_on_node} end. +is_over_connection_limit(VHost, {Username, _Password}, Pid) -> + PrintedUsername = case Username of + none -> ""; + _ -> Username + end, + try rabbit_connection_tracking:is_over_connection_limit(VHost) of + false -> false; + {true, Limit} -> + rabbit_log_connection:error( + "Error on Direct connection ~p~n" + "access to vhost '~s' refused for user '~s': " + "connection limit (~p) is reached", + [Pid, VHost, PrintedUsername, Limit]), + true + catch + throw:{error, {no_such_vhost, VHost}} -> + rabbit_log_connection:error( + "Error on Direct connection ~p~n" + "vhost ~s not found", [Pid, VHost]), + true + end. + notify_auth_result(Username, AuthResult, ExtraProps) -> EventProps = [{connection_type, direct}, {name, case Username of none -> ''; _ -> Username end}] ++ diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl index 4fae129a35..592e57c41a 100644 --- a/test/per_vhost_connection_limit_SUITE.erl +++ b/test/per_vhost_connection_limit_SUITE.erl @@ -24,35 +24,41 @@ all() -> [ - {group, cluster_size_1}, - {group, cluster_size_2} + {group, cluster_size_1_network}, + {group, cluster_size_2_network}, + {group, cluster_size_1_direct}, + {group, cluster_size_2_direct} ]. groups() -> + ClusterSize1Tests = [ + most_basic_single_node_connection_count, + single_node_single_vhost_connection_count, + single_node_multiple_vhosts_connection_count, + single_node_list_in_vhost, + single_node_single_vhost_limit, + single_node_single_vhost_zero_limit, + single_node_multiple_vhosts_limit, + single_node_multiple_vhosts_zero_limit, + single_node_vhost_deletion_forces_connection_closure + ], + ClusterSize2Tests = [ + most_basic_cluster_connection_count, + cluster_single_vhost_connection_count, + cluster_multiple_vhosts_connection_count, + cluster_node_restart_connection_count, + cluster_node_list_on_node, + cluster_single_vhost_limit, + cluster_single_vhost_limit2, + cluster_single_vhost_zero_limit, + cluster_multiple_vhosts_zero_limit, + cluster_vhost_deletion_forces_connection_closure + ], [ - {cluster_size_1, [], [ - most_basic_single_node_connection_count, - single_node_single_vhost_connection_count, - single_node_multiple_vhosts_connection_count, - single_node_list_in_vhost, - single_node_single_vhost_limit, - single_node_single_vhost_zero_limit, - single_node_multiple_vhosts_limit, - single_node_multiple_vhosts_zero_limit, - single_node_vhost_deletion_forces_connection_closure - ]}, - {cluster_size_2, [], [ - most_basic_cluster_connection_count, - cluster_single_vhost_connection_count, - cluster_multiple_vhosts_connection_count, - cluster_node_restart_connection_count, - cluster_node_list_on_node, - cluster_single_vhost_limit, - cluster_single_vhost_limit2, - cluster_single_vhost_zero_limit, - cluster_multiple_vhosts_zero_limit, - cluster_vhost_deletion_forces_connection_closure - ]}, + {cluster_size_1_network, [], ClusterSize1Tests}, + {cluster_size_2_network, [], ClusterSize2Tests}, + {cluster_size_1_direct, [], ClusterSize1Tests}, + {cluster_size_2_direct, [], ClusterSize2Tests}, {cluster_rename, [], [ vhost_limit_after_node_renamed ]} @@ -80,10 +86,19 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). -init_per_group(cluster_size_1, Config) -> - init_per_multinode_group(cluster_size_1, Config, 1); -init_per_group(cluster_size_2, Config) -> - init_per_multinode_group(cluster_size_2, Config, 2); +init_per_group(cluster_size_1_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_1_network, Config1, 1); +init_per_group(cluster_size_2_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_2_network, Config1, 2); +init_per_group(cluster_size_1_direct, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]), + init_per_multinode_group(cluster_size_1_direct, Config1, 1); +init_per_group(cluster_size_2_direct, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]), + init_per_multinode_group(cluster_size_2_direct, Config1, 2); + init_per_group(cluster_rename, Config) -> init_per_multinode_group(cluster_rename, Config, 2). @@ -706,12 +721,17 @@ vhost_limit_after_node_renamed(Config) -> %% ------------------------------------------------------------------- open_connections(Config, NodesAndVHosts) -> + % Randomly select connection type + OpenConnectionFun = case ?config(connection_type, Config) of + network -> open_unmanaged_connection; + direct -> open_unmanaged_connection_direct + end, Conns = lists:map(fun ({Node, VHost}) -> - rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node, + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node, VHost); (Node) -> - rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node) + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node) end, NodesAndVHosts), timer:sleep(500), Conns. |
