summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2016-09-15 17:05:23 +0300
committerMichael Klishin <michael@clojurewerkz.org>2016-09-15 17:05:23 +0300
commit0e60c60954378b4339ed77d1e5b71e07f0bac002 (patch)
tree1ee03ed1fddf3c726422649b3d4b9a8233347664
parent6577b1041b5e3f0b4d8bf89593afdf3c5d23017a (diff)
parent1b0096a925ba56af16d4776713a5c8e9593c587a (diff)
downloadrabbitmq-server-git-0e60c60954378b4339ed77d1e5b71e07f0bac002.tar.gz
Merge branch 'master' into rabbitmq-server-501
-rwxr-xr-xscripts/rabbitmq-env7
-rw-r--r--scripts/rabbitmq-env.bat10
-rwxr-xr-xscripts/rabbitmq-server3
-rwxr-xr-xscripts/rabbitmq-server-ha.ocf14
-rw-r--r--src/gm.erl7
-rw-r--r--src/rabbit.app.src3
-rw-r--r--src/rabbit_connection_tracking.erl2
-rw-r--r--src/rabbit_connection_tracking_handler.erl13
-rw-r--r--src/rabbit_direct.erl73
-rw-r--r--test/per_vhost_connection_limit_SUITE.erl82
10 files changed, 151 insertions, 63 deletions
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 1b9c6df4b3..206bdd0c20 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -65,8 +65,11 @@ RABBITMQ_HOME="$(rmq_realpath "${RABBITMQ_SCRIPTS_DIR}/..")"
DEFAULT_SCHEDULER_BIND_TYPE="db"
[ "x" = "x$RABBITMQ_SCHEDULER_BIND_TYPE" ] && RABBITMQ_SCHEDULER_BIND_TYPE=${DEFAULT_SCHEDULER_BIND_TYPE}
+DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000
+[ "x" = "x$RABBITMQ_DISTRIBUTION_BUFFER_SIZE" ] && RABBITMQ_DISTRIBUTION_BUFFER_SIZE=${DEFAULT_DISTRIBUTION_BUFFER_SIZE}
+
## Common defaults
-SERVER_ERL_ARGS="+P 1048576 +t 5000000 +stbt $RABBITMQ_SCHEDULER_BIND_TYPE "
+SERVER_ERL_ARGS="+P 1048576 +t 5000000 +stbt $RABBITMQ_SCHEDULER_BIND_TYPE +zdbbl $RABBITMQ_DISTRIBUTION_BUFFER_SIZE"
# We save the current value of $RABBITMQ_PID_FILE in case it was set by
# an init script. If $CONF_ENV_FILE overrides it again, we must ignore
@@ -186,6 +189,8 @@ DEFAULT_NODE_PORT=5672
[ "x" = "x$RABBITMQ_GENERATED_CONFIG_DIR" ] && RABBITMQ_GENERATED_CONFIG_DIR=${GENERATED_CONFIG_DIR}
[ "x" = "x$RABBITMQ_ADVANCED_CONFIG_FILE" ] && RABBITMQ_ADVANCED_CONFIG_FILE=${ADVANCED_CONFIG_FILE}
[ "x" = "x$RABBITMQ_SCHEMA_DIR" ] && RABBITMQ_SCHEMA_DIR=${SCHEMA_DIR}
+[ "x" = "x$RABBITMQ_IGNORE_SIGINT" ] && RABBITMQ_IGNORE_SIGINT="true"
+[ "xtrue" = "x$RABBITMQ_IGNORE_SIGINT" ] && RABBITMQ_IGNORE_SIGINT_FLAG="+B i"
rmq_normalize_path_var \
RABBITMQ_CONFIG_FILE \
diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat
index a1576e9b27..56b2f69b2d 100644
--- a/scripts/rabbitmq-env.bat
+++ b/scripts/rabbitmq-env.bat
@@ -38,6 +38,14 @@ if "!RABBITMQ_SCHEDULER_BIND_TYPE!"=="" (
set RABBITMQ_SCHEDULER_BIND_TYPE=!DEFAULT_SCHEDULER_BIND_TYPE!
)
+REM DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000
+REM set the VM distribution buffer size
+REM [ "x" = "x$RABBITMQ_DISTRIBUTION_BUFFER_SIZE" ] && RABBITMQ_DISTRIBUTION_BUFFER_SIZE=${DEFAULT_DISTRIBUTION_BUFFER_SIZE}
+set DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000
+if "!RABBITMQ_DISTRIBUTION_BUFFER_SIZE!"=="" (
+ set RABBITMQ_DISTRIBUTION_BUFFER_SIZE=!DEFAULT_DISTRIBUTION_BUFFER_SIZE!
+)
+
REM # warn about old rabbitmq.conf file, if no new one
REM if [ -f /etc/rabbitmq/rabbitmq.conf ] && \
REM [ ! -f ${CONF_ENV_FILE} ] ; then
@@ -46,7 +54,7 @@ REM echo "location has moved to ${CONF_ENV_FILE}"
REM fi
REM Common defaults
-set SERVER_ERL_ARGS=+P 1048576 +t 5000000 +stbt !RABBITMQ_SCHEDULER_BIND_TYPE!
+set SERVER_ERL_ARGS=+P 1048576 +t 5000000 +stbt !RABBITMQ_SCHEDULER_BIND_TYPE! +zdbbl !RABBITMQ_DISTRIBUTION_BUFFER_SIZE!
REM ## Get configuration variables from the configure environment file
REM [ -f ${CONF_ENV_FILE} ] && . ${CONF_ENV_FILE} || true
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index a0500ebb87..48365252e5 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -262,9 +262,8 @@ else
# When RabbitMQ runs in the foreground but the Erlang shell is
# disabled, we setup signal handlers to stop RabbitMQ properly. This
# is at least useful in the case of Docker.
-
# The Erlang VM should ignore SIGINT.
- RABBITMQ_SERVER_START_ARGS="${RABBITMQ_SERVER_START_ARGS} +B i"
+ RABBITMQ_SERVER_START_ARGS="${RABBITMQ_SERVER_START_ARGS} ${RABBITMQ_IGNORE_SIGINT_FLAG}"
# Signal handlers. They all stop RabbitMQ properly (using
# rabbitmqctl stop). Depending on the signal, this script will exit
diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf
index 63e5f700d9..8f711b3473 100755
--- a/scripts/rabbitmq-server-ha.ocf
+++ b/scripts/rabbitmq-server-ha.ocf
@@ -39,6 +39,7 @@ OCF_RESKEY_definitions_dump_file_default="/etc/rabbitmq/definitions"
OCF_RESKEY_pid_file_default="/var/run/rabbitmq/pid"
OCF_RESKEY_log_dir_default="/var/log/rabbitmq"
OCF_RESKEY_mnesia_base_default="/var/lib/rabbitmq/mnesia"
+OCF_RESKEY_mnesia_schema_base_default="/var/lib/rabbitmq"
OCF_RESKEY_host_ip_default="127.0.0.1"
OCF_RESKEY_node_port_default=5672
OCF_RESKEY_erlang_cookie_default=false
@@ -62,6 +63,7 @@ OCF_RESKEY_rmq_feature_local_list_queues_default=true
: ${OCF_RESKEY_definitions_dump_file=${OCF_RESKEY_definitions_dump_file_default}}
: ${OCF_RESKEY_log_dir=${OCF_RESKEY_log_dir_default}}
: ${OCF_RESKEY_mnesia_base=${OCF_RESKEY_mnesia_base_default}}
+: ${OCF_RESKEY_mnesia_schema_base=${OCF_RESKEY_mnesia_schema_base_default}}
: ${OCF_RESKEY_pid_file=${OCF_RESKEY_pid_file_default}}
: ${OCF_RESKEY_node_port=${OCF_RESKEY_node_port_default}}
: ${OCF_RESKEY_erlang_cookie=${OCF_RESKEY_erlang_cookie_default}}
@@ -234,6 +236,14 @@ Base directory for storing Mnesia files
<content type="boolean" default="${OCF_RESKEY_mnesia_base_default}" />
</parameter>
+<parameter name="mnesia_schema_base" unique="0" required="0">
+<longdesc lang="en">
+Parent directory for Mnesia schema directory
+</longdesc>
+<shortdesc lang="en">Parent directory for Mnesia schema directory</shortdesc>
+<content type="string" default="${OCF_RESKEY_mnesia_schema_base_default}" />
+</parameter>
+
<parameter name="host_ip" unique="0" required="0">
<longdesc lang="en">
${OCF_RESKEY_binary} should listen on this IP address
@@ -711,7 +721,9 @@ reset_mnesia() {
if $make_amnesia ; then
kill_rmq_and_remove_pid
ocf_run rm -rf "${MNESIA_FILES}"
- ocf_log warn "${LH} Mnesia files appear corrupted and have been removed from ${MNESIA_FILES}."
+ mnesia_schema_location="${OCF_RESKEY_mnesia_schema_base}/Mnesia.$(rabbit_node_name $(get_hostname))"
+ ocf_run rm -rf "$mnesia_schema_location"
+ ocf_log warn "${LH} Mnesia files appear corrupted and have been removed from ${MNESIA_FILES} and $mnesia_schema_location"
fi
# always return OCF SUCCESS
return $OCF_SUCCESS
diff --git a/src/gm.erl b/src/gm.erl
index 0b5c1c44c4..aef23c7269 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -1137,7 +1137,7 @@ record_dead_member_in_group(Self, Member, GroupName, TxnFun, Verify) ->
true ->
check_membership(Self, read_group(GroupName));
false ->
- read_group(GroupName)
+ check_group(read_group(GroupName))
end,
case lists:splitwith(
fun (Member1) -> Member1 =/= Member end, Members) of
@@ -1615,3 +1615,8 @@ check_membership(GroupName) ->
{error, not_found} ->
throw(lost_membership)
end.
+
+check_group({error, not_found}) ->
+ throw(lost_membership);
+check_group(Any) ->
+ Any.
diff --git a/src/rabbit.app.src b/src/rabbit.app.src
index 738a38e2bb..2e17bbbc3c 100644
--- a/src/rabbit.app.src
+++ b/src/rabbit.app.src
@@ -98,7 +98,8 @@
{msg_store_credit_disc_bound, {2000, 500}},
{msg_store_io_batch_size, 2048},
%% see rabbitmq-server#143
- {credit_flow_default_credit, {200, 50}},
+ %% and rabbitmq-server#949
+ {credit_flow_default_credit, {200, 100}},
%% see rabbitmq-server#248
%% and rabbitmq-server#667
{channel_operation_timeout, 15000}
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index ab945abc2f..b1d34ca516 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -315,6 +315,7 @@ tracked_connection_from_connection_created(EventDetails) ->
username = pget(user, EventDetails),
connected_at = pget(connected_at, EventDetails),
pid = pget(pid, EventDetails),
+ type = pget(type, EventDetails),
peer_host = pget(peer_host, EventDetails),
peer_port = pget(peer_port, EventDetails)}.
@@ -333,5 +334,6 @@ tracked_connection_from_connection_state(#connection{
{user, Username},
{connected_at, Ts},
{pid, self()},
+ {type, network},
{peer_port, PeerPort},
{peer_host, PeerHost}]).
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index fd1df8c88a..598fe686c3 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -77,9 +77,9 @@ handle_event(#event{type = connection_closed, props = Details}, State) ->
handle_event(#event{type = vhost_deleted, props = Details}, State) ->
VHost = pget(name, Details),
rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
- [rabbit_networking:close_connection(Pid, rabbit_misc:format("vhost '~s' is deleted", [VHost])) ||
- #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list(VHost)],
- {ok, State};
+ [close_connection(Conn, rabbit_misc:format("vhost '~s' is deleted", [VHost]))
+ || Conn <- rabbit_connection_tracking:list(VHost)],
+ {ok, State};
handle_event(#event{type = user_deleted, props = Details}, State) ->
_Username = pget(name, Details),
%% TODO: force close and unregister connections from
@@ -106,3 +106,10 @@ terminate(_Arg, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
+ rabbit_networking:close_connection(Pid, Message);
+close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
+ %% Do an RPC call to the node running the direct client.
+ Node = node(Pid),
+ rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 061105c150..858681ecfd 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -65,35 +65,64 @@ list() ->
%%----------------------------------------------------------------------------
-connect({none, _}, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end,
- VHost, Protocol, Pid, Infos);
+auth_fun({none, _}, _VHost) ->
+ fun () -> {ok, rabbit_auth_backend_dummy:user()} end;
-connect({Username, none}, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> rabbit_access_control:check_user_login(Username, []) end,
- VHost, Protocol, Pid, Infos);
+auth_fun({Username, none}, _VHost) ->
+ fun () -> rabbit_access_control:check_user_login(Username, []) end;
-connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> rabbit_access_control:check_user_login(
- Username, [{password, Password}, {vhost, VHost}]) end,
- VHost, Protocol, Pid, Infos).
+auth_fun({Username, Password}, VHost) ->
+ fun () ->
+ rabbit_access_control:check_user_login(
+ Username,
+ [{password, Password}, {vhost, VHost}])
+ end.
-connect0(AuthFun, VHost, Protocol, Pid, Infos) ->
+connect(Creds, VHost, Protocol, Pid, Infos) ->
+ AuthFun = auth_fun(Creds, VHost),
case rabbit:is_running() of
- true -> case AuthFun() of
- {ok, User = #user{username = Username}} ->
- notify_auth_result(Username,
- user_authentication_success, []),
- connect1(User, VHost, Protocol, Pid, Infos);
- {refused, Username, Msg, Args} ->
- notify_auth_result(Username,
- user_authentication_failure,
- [{error, rabbit_misc:format(Msg, Args)}]),
- {error, {auth_failure, "Refused"}}
- end;
+ true ->
+ case is_over_connection_limit(VHost, Creds, Pid) of
+ true ->
+ {error, not_allowed};
+ false ->
+ case AuthFun() of
+ {ok, User = #user{username = Username}} ->
+ notify_auth_result(Username,
+ user_authentication_success, []),
+ connect1(User, VHost, Protocol, Pid, Infos);
+ {refused, Username, Msg, Args} ->
+ notify_auth_result(Username,
+ user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}]),
+ {error, {auth_failure, "Refused"}}
+ end
+ end;
false -> {error, broker_not_found_on_node}
end.
+is_over_connection_limit(VHost, {Username, _Password}, Pid) ->
+ PrintedUsername = case Username of
+ none -> "";
+ _ -> Username
+ end,
+ try rabbit_connection_tracking:is_over_connection_limit(VHost) of
+ false -> false;
+ {true, Limit} ->
+ rabbit_log_connection:error(
+ "Error on Direct connection ~p~n"
+ "access to vhost '~s' refused for user '~s': "
+ "connection limit (~p) is reached",
+ [Pid, VHost, PrintedUsername, Limit]),
+ true
+ catch
+ throw:{error, {no_such_vhost, VHost}} ->
+ rabbit_log_connection:error(
+ "Error on Direct connection ~p~n"
+ "vhost ~s not found", [Pid, VHost]),
+ true
+ end.
+
notify_auth_result(Username, AuthResult, ExtraProps) ->
EventProps = [{connection_type, direct},
{name, case Username of none -> ''; _ -> Username end}] ++
diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl
index 4fae129a35..592e57c41a 100644
--- a/test/per_vhost_connection_limit_SUITE.erl
+++ b/test/per_vhost_connection_limit_SUITE.erl
@@ -24,35 +24,41 @@
all() ->
[
- {group, cluster_size_1},
- {group, cluster_size_2}
+ {group, cluster_size_1_network},
+ {group, cluster_size_2_network},
+ {group, cluster_size_1_direct},
+ {group, cluster_size_2_direct}
].
groups() ->
+ ClusterSize1Tests = [
+ most_basic_single_node_connection_count,
+ single_node_single_vhost_connection_count,
+ single_node_multiple_vhosts_connection_count,
+ single_node_list_in_vhost,
+ single_node_single_vhost_limit,
+ single_node_single_vhost_zero_limit,
+ single_node_multiple_vhosts_limit,
+ single_node_multiple_vhosts_zero_limit,
+ single_node_vhost_deletion_forces_connection_closure
+ ],
+ ClusterSize2Tests = [
+ most_basic_cluster_connection_count,
+ cluster_single_vhost_connection_count,
+ cluster_multiple_vhosts_connection_count,
+ cluster_node_restart_connection_count,
+ cluster_node_list_on_node,
+ cluster_single_vhost_limit,
+ cluster_single_vhost_limit2,
+ cluster_single_vhost_zero_limit,
+ cluster_multiple_vhosts_zero_limit,
+ cluster_vhost_deletion_forces_connection_closure
+ ],
[
- {cluster_size_1, [], [
- most_basic_single_node_connection_count,
- single_node_single_vhost_connection_count,
- single_node_multiple_vhosts_connection_count,
- single_node_list_in_vhost,
- single_node_single_vhost_limit,
- single_node_single_vhost_zero_limit,
- single_node_multiple_vhosts_limit,
- single_node_multiple_vhosts_zero_limit,
- single_node_vhost_deletion_forces_connection_closure
- ]},
- {cluster_size_2, [], [
- most_basic_cluster_connection_count,
- cluster_single_vhost_connection_count,
- cluster_multiple_vhosts_connection_count,
- cluster_node_restart_connection_count,
- cluster_node_list_on_node,
- cluster_single_vhost_limit,
- cluster_single_vhost_limit2,
- cluster_single_vhost_zero_limit,
- cluster_multiple_vhosts_zero_limit,
- cluster_vhost_deletion_forces_connection_closure
- ]},
+ {cluster_size_1_network, [], ClusterSize1Tests},
+ {cluster_size_2_network, [], ClusterSize2Tests},
+ {cluster_size_1_direct, [], ClusterSize1Tests},
+ {cluster_size_2_direct, [], ClusterSize2Tests},
{cluster_rename, [], [
vhost_limit_after_node_renamed
]}
@@ -80,10 +86,19 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
-init_per_group(cluster_size_1, Config) ->
- init_per_multinode_group(cluster_size_1, Config, 1);
-init_per_group(cluster_size_2, Config) ->
- init_per_multinode_group(cluster_size_2, Config, 2);
+init_per_group(cluster_size_1_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_1_network, Config1, 1);
+init_per_group(cluster_size_2_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_2_network, Config1, 2);
+init_per_group(cluster_size_1_direct, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
+ init_per_multinode_group(cluster_size_1_direct, Config1, 1);
+init_per_group(cluster_size_2_direct, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
+ init_per_multinode_group(cluster_size_2_direct, Config1, 2);
+
init_per_group(cluster_rename, Config) ->
init_per_multinode_group(cluster_rename, Config, 2).
@@ -706,12 +721,17 @@ vhost_limit_after_node_renamed(Config) ->
%% -------------------------------------------------------------------
open_connections(Config, NodesAndVHosts) ->
+ % Randomly select connection type
+ OpenConnectionFun = case ?config(connection_type, Config) of
+ network -> open_unmanaged_connection;
+ direct -> open_unmanaged_connection_direct
+ end,
Conns = lists:map(fun
({Node, VHost}) ->
- rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node,
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node,
VHost);
(Node) ->
- rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node)
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node)
end, NodesAndVHosts),
timer:sleep(500),
Conns.