summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2016-09-15 17:05:23 +0300
committerMichael Klishin <michael@clojurewerkz.org>2016-09-15 17:05:23 +0300
commit0e60c60954378b4339ed77d1e5b71e07f0bac002 (patch)
tree1ee03ed1fddf3c726422649b3d4b9a8233347664 /src
parent6577b1041b5e3f0b4d8bf89593afdf3c5d23017a (diff)
parent1b0096a925ba56af16d4776713a5c8e9593c587a (diff)
downloadrabbitmq-server-git-0e60c60954378b4339ed77d1e5b71e07f0bac002.tar.gz
Merge branch 'master' into rabbitmq-server-501
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl7
-rw-r--r--src/rabbit.app.src3
-rw-r--r--src/rabbit_connection_tracking.erl2
-rw-r--r--src/rabbit_connection_tracking_handler.erl13
-rw-r--r--src/rabbit_direct.erl73
5 files changed, 71 insertions, 27 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 0b5c1c44c4..aef23c7269 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -1137,7 +1137,7 @@ record_dead_member_in_group(Self, Member, GroupName, TxnFun, Verify) ->
true ->
check_membership(Self, read_group(GroupName));
false ->
- read_group(GroupName)
+ check_group(read_group(GroupName))
end,
case lists:splitwith(
fun (Member1) -> Member1 =/= Member end, Members) of
@@ -1615,3 +1615,8 @@ check_membership(GroupName) ->
{error, not_found} ->
throw(lost_membership)
end.
+
+check_group({error, not_found}) ->
+ throw(lost_membership);
+check_group(Any) ->
+ Any.
diff --git a/src/rabbit.app.src b/src/rabbit.app.src
index 738a38e2bb..2e17bbbc3c 100644
--- a/src/rabbit.app.src
+++ b/src/rabbit.app.src
@@ -98,7 +98,8 @@
{msg_store_credit_disc_bound, {2000, 500}},
{msg_store_io_batch_size, 2048},
%% see rabbitmq-server#143
- {credit_flow_default_credit, {200, 50}},
+ %% and rabbitmq-server#949
+ {credit_flow_default_credit, {200, 100}},
%% see rabbitmq-server#248
%% and rabbitmq-server#667
{channel_operation_timeout, 15000}
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index ab945abc2f..b1d34ca516 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -315,6 +315,7 @@ tracked_connection_from_connection_created(EventDetails) ->
username = pget(user, EventDetails),
connected_at = pget(connected_at, EventDetails),
pid = pget(pid, EventDetails),
+ type = pget(type, EventDetails),
peer_host = pget(peer_host, EventDetails),
peer_port = pget(peer_port, EventDetails)}.
@@ -333,5 +334,6 @@ tracked_connection_from_connection_state(#connection{
{user, Username},
{connected_at, Ts},
{pid, self()},
+ {type, network},
{peer_port, PeerPort},
{peer_host, PeerHost}]).
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index fd1df8c88a..598fe686c3 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -77,9 +77,9 @@ handle_event(#event{type = connection_closed, props = Details}, State) ->
handle_event(#event{type = vhost_deleted, props = Details}, State) ->
VHost = pget(name, Details),
rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
- [rabbit_networking:close_connection(Pid, rabbit_misc:format("vhost '~s' is deleted", [VHost])) ||
- #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list(VHost)],
- {ok, State};
+ [close_connection(Conn, rabbit_misc:format("vhost '~s' is deleted", [VHost]))
+ || Conn <- rabbit_connection_tracking:list(VHost)],
+ {ok, State};
handle_event(#event{type = user_deleted, props = Details}, State) ->
_Username = pget(name, Details),
%% TODO: force close and unregister connections from
@@ -106,3 +106,10 @@ terminate(_Arg, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
+ rabbit_networking:close_connection(Pid, Message);
+close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
+ %% Do an RPC call to the node running the direct client.
+ Node = node(Pid),
+ rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 061105c150..858681ecfd 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -65,35 +65,64 @@ list() ->
%%----------------------------------------------------------------------------
-connect({none, _}, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end,
- VHost, Protocol, Pid, Infos);
+auth_fun({none, _}, _VHost) ->
+ fun () -> {ok, rabbit_auth_backend_dummy:user()} end;
-connect({Username, none}, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> rabbit_access_control:check_user_login(Username, []) end,
- VHost, Protocol, Pid, Infos);
+auth_fun({Username, none}, _VHost) ->
+ fun () -> rabbit_access_control:check_user_login(Username, []) end;
-connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> rabbit_access_control:check_user_login(
- Username, [{password, Password}, {vhost, VHost}]) end,
- VHost, Protocol, Pid, Infos).
+auth_fun({Username, Password}, VHost) ->
+ fun () ->
+ rabbit_access_control:check_user_login(
+ Username,
+ [{password, Password}, {vhost, VHost}])
+ end.
-connect0(AuthFun, VHost, Protocol, Pid, Infos) ->
+connect(Creds, VHost, Protocol, Pid, Infos) ->
+ AuthFun = auth_fun(Creds, VHost),
case rabbit:is_running() of
- true -> case AuthFun() of
- {ok, User = #user{username = Username}} ->
- notify_auth_result(Username,
- user_authentication_success, []),
- connect1(User, VHost, Protocol, Pid, Infos);
- {refused, Username, Msg, Args} ->
- notify_auth_result(Username,
- user_authentication_failure,
- [{error, rabbit_misc:format(Msg, Args)}]),
- {error, {auth_failure, "Refused"}}
- end;
+ true ->
+ case is_over_connection_limit(VHost, Creds, Pid) of
+ true ->
+ {error, not_allowed};
+ false ->
+ case AuthFun() of
+ {ok, User = #user{username = Username}} ->
+ notify_auth_result(Username,
+ user_authentication_success, []),
+ connect1(User, VHost, Protocol, Pid, Infos);
+ {refused, Username, Msg, Args} ->
+ notify_auth_result(Username,
+ user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}]),
+ {error, {auth_failure, "Refused"}}
+ end
+ end;
false -> {error, broker_not_found_on_node}
end.
+is_over_connection_limit(VHost, {Username, _Password}, Pid) ->
+ PrintedUsername = case Username of
+ none -> "";
+ _ -> Username
+ end,
+ try rabbit_connection_tracking:is_over_connection_limit(VHost) of
+ false -> false;
+ {true, Limit} ->
+ rabbit_log_connection:error(
+ "Error on Direct connection ~p~n"
+ "access to vhost '~s' refused for user '~s': "
+ "connection limit (~p) is reached",
+ [Pid, VHost, PrintedUsername, Limit]),
+ true
+ catch
+ throw:{error, {no_such_vhost, VHost}} ->
+ rabbit_log_connection:error(
+ "Error on Direct connection ~p~n"
+ "vhost ~s not found", [Pid, VHost]),
+ true
+ end.
+
notify_auth_result(Username, AuthResult, ExtraProps) ->
EventProps = [{connection_type, direct},
{name, case Username of none -> ''; _ -> Username end}] ++