summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-08-05 15:56:46 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-08-05 15:56:46 +0100
commit5e7719a0e55e57b59c2ef40ad41b8e03eb116120 (patch)
treed4ed9b42d7f456ef8cb206a9c714515f23a6cc8c
parent22a99dc7c4040a53c78d74baa29a0ede6f87b905 (diff)
parentfbdc8c26f8915d72548415e2ff20fe546968b2b8 (diff)
downloadrabbitmq-server-git-5e7719a0e55e57b59c2ef40ad41b8e03eb116120.tar.gz
merge default into bug23027
-rw-r--r--docs/rabbitmqctl.1.xml23
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit.erl4
-rw-r--r--src/rabbit_access_control.erl32
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_control.erl141
-rw-r--r--src/rabbit_event.erl9
-rw-r--r--src/rabbit_misc.erl37
-rw-r--r--src/rabbit_queue_collector.erl12
-rw-r--r--src/rabbit_reader.erl19
-rw-r--r--src/rabbit_router.erl4
-rw-r--r--src/rabbit_tests.erl91
13 files changed, 252 insertions, 136 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index a7d064f1e7..33552e17cc 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -88,9 +88,6 @@
</listitem>
</varlistentry>
</variablelist>
- <para>
- Flags must precede all other parameters to <command>rabbitmqctl</command>.
- </para>
</refsect1>
<refsect1>
@@ -271,7 +268,7 @@
<variablelist>
<varlistentry id="cluster">
- <term><cmdsynopsis><command>cluster</command><arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -336,7 +333,7 @@
</listitem>
</varlistentry>
<varlistentry id="force_cluster">
- <term><cmdsynopsis><command>force_cluster</command><arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>force_cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -547,7 +544,7 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>configure</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt">-s <replaceable>scope</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -555,11 +552,21 @@
<listitem><para>The name of the virtual host to which to grant the user access, defaulting to <command>/</command>.</para></listitem>
</varlistentry>
<varlistentry>
- <term>username</term>
+ <term>scope</term>
+ <listitem><para>Scope of the permissions: either
+ <command>client</command> (the default) or
+ <command>all</command>. This determines whether
+ permissions are checked for server-generated resource
+ names (<command>all</command>) or only for
+ client-specified resource names
+ (<command>client</command>).</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>user</term>
<listitem><para>The name of the user to grant access to the specified virtual host.</para></listitem>
</varlistentry>
<varlistentry>
- <term>configure</term>
+ <term>conf</term>
<listitem><para>A regular expression matching resource names for which the user is granted configure permissions.</para></listitem>
</varlistentry>
<varlistentry>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index a2529e2d19..b9abd78857 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -30,7 +30,7 @@
%%
-record(user, {username, password}).
--record(permission, {configure, write, read}).
+-record(permission, {scope, configure, write, read}).
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 1fab7e4d82..41c628a071 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -90,9 +90,9 @@
{enables, kernel_ready}]}).
-rabbit_boot_step({rabbit_event,
- [{description, "statistics event handler"},
+ [{description, "statistics event manager"},
{mfa, {rabbit_sup, start_restartable_child,
- [gen_event, [{local, rabbit_event}]]}},
+ [rabbit_event]}},
{requires, external_infrastructure},
{enables, kernel_ready}]}).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 1dbda8058e..8d00f59124 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -38,7 +38,7 @@
-export([add_user/2, delete_user/1, change_password/2, list_users/0,
lookup_user/1]).
-export([add_vhost/1, delete_vhost/1, list_vhosts/0]).
--export([set_permissions/5, clear_permissions/2,
+-export([set_permissions/5, set_permissions/6, clear_permissions/2,
list_vhost_permissions/1, list_user_permissions/1]).
%%----------------------------------------------------------------------------
@@ -51,6 +51,7 @@
-type(username() :: binary()).
-type(password() :: binary()).
-type(regexp() :: binary()).
+-type(scope() :: binary()).
-spec(check_login/2 ::
(binary(), binary()) -> rabbit_types:user() |
@@ -78,6 +79,8 @@
-spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]).
-spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(),
regexp(), regexp()) -> 'ok').
+-spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(),
+ regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok').
-spec(list_vhost_permissions/1 ::
(rabbit_types:vhost())
@@ -157,6 +160,7 @@ check_vhost_access(#user{username = Username}, VHostPath) ->
[VHostPath, Username])
end.
+permission_index(scope) -> #permission.scope;
permission_index(configure) -> #permission.configure;
permission_index(write) -> #permission.write;
permission_index(read) -> #permission.read.
@@ -169,7 +173,7 @@ check_resource_access(Username,
Permission);
check_resource_access(_Username,
#resource{name = <<"amq.gen",_/binary>>},
- _Permission) ->
+ #permission{scope = client}) ->
ok;
check_resource_access(Username,
R = #resource{virtual_host = VHostPath, name = Name},
@@ -300,7 +304,7 @@ internal_delete_vhost(VHostPath) ->
ok = rabbit_exchange:delete(Name, false)
end,
rabbit_exchange:list(VHostPath)),
- lists:foreach(fun ({Username, _, _, _}) ->
+ lists:foreach(fun ({Username, _, _, _, _}) ->
ok = clear_permissions(Username, VHostPath)
end,
list_vhost_permissions(VHostPath)),
@@ -318,7 +322,16 @@ validate_regexp(RegexpBin) ->
end.
set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
+ set_permissions(<<"client">>, Username, VHostPath, ConfigurePerm,
+ WritePerm, ReadPerm).
+
+set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
+ Scope = case ScopeBin of
+ <<"client">> -> client;
+ <<"all">> -> all;
+ _ -> throw({error, {invalid_scope, ScopeBin}})
+ end,
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
@@ -328,12 +341,14 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
username = Username,
virtual_host = VHostPath},
permission = #permission{
+ scope = Scope,
configure = ConfigurePerm,
write = WritePerm,
read = ReadPerm}},
write)
end)).
+
clear_permissions(Username, VHostPath) ->
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
@@ -345,22 +360,23 @@ clear_permissions(Username, VHostPath) ->
end)).
list_vhost_permissions(VHostPath) ->
- [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
- {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
+ [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
+ {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
list_permissions(rabbit_misc:with_vhost(
VHostPath, match_user_vhost('_', VHostPath)))].
list_user_permissions(Username) ->
- [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
- {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
+ [{VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
+ {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
list_permissions(rabbit_misc:with_user(
Username, match_user_vhost(Username, '_')))].
list_permissions(QueryThunk) ->
- [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
#user_permission{user_vhost = #user_vhost{username = Username,
virtual_host = VHostPath},
permission = #permission{
+ scope = Scope,
configure = ConfigurePerm,
write = WritePerm,
read = ReadPerm}} <-
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d4226331c3..b55d5b210c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -355,7 +355,7 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
-emit_stats(#amqqueue{pid = QPid}) ->
+emit_stats(#amqqueue{pid = QPid}) ->
delegate_pcast(QPid, 7, emit_stats).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
@@ -386,7 +386,6 @@ reject(QPid, MsgIds, Requeue, ChPid) ->
commit_all(QPids, Txn, ChPid) ->
safe_delegate_call_ok(
- fun (QPid) -> exit({queue_disappeared, QPid}) end,
fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end,
QPids).
@@ -396,9 +395,6 @@ rollback_all(QPids, Txn, ChPid) ->
notify_down_all(QPids, ChPid) ->
safe_delegate_call_ok(
- %% we don't care if the queue process has terminated in the
- %% meantime
- fun (_) -> ok end,
fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
QPids).
@@ -493,11 +489,11 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
pid = Pid}.
-safe_delegate_call_ok(H, F, Pids) ->
+safe_delegate_call_ok(F, Pids) ->
{_, Bad} = delegate:invoke(Pids,
fun (Pid) ->
rabbit_misc:with_exit_handler(
- fun () -> H(Pid) end,
+ fun () -> ok end,
fun () -> F(Pid) end)
end),
case Bad of
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5e1b1f716c..d52660c5ac 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -241,7 +241,7 @@ stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{rate_timer_ref = undefined}.
-
+
stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
State;
stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
@@ -887,7 +887,7 @@ handle_cast(maybe_expire, State) ->
{stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
end;
-
+
handle_cast(emit_stats, State) ->
emit_stats(State),
noreply(State).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6e6ad06cb3..f0b623c2dc 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -32,20 +32,25 @@
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/4]).
-
--record(params, {quiet, node, command, args}).
+-export([start/0, stop/0, action/5]).
-define(RPC_TIMEOUT, infinity).
+-define(QUIET_OPT, "-q").
+-define(NODE_OPT, "-n").
+-define(VHOST_OPT, "-p").
+-define(SCOPE_OPT, "-s").
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
--spec(action/4 :: (atom(), node(), [string()],
- fun ((string(), [any()]) -> 'ok')) -> 'ok').
+-spec(action/5 ::
+ (atom(), node(), [string()], [{string(), any()}],
+ fun ((string(), [any()]) -> 'ok'))
+ -> 'ok').
-spec(usage/0 :: () -> no_return()).
-endif.
@@ -55,18 +60,33 @@
start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
FullCommand = init:get_plain_arguments(),
- #params{quiet = Quiet, node = Node, command = Command, args = Args} =
- parse_args(FullCommand, #params{quiet = false,
- node = rabbit_misc:makenode(NodeStr)}),
+ case FullCommand of
+ [] -> usage();
+ _ -> ok
+ end,
+ {[Command0 | Args], Opts} =
+ rabbit_misc:get_options(
+ [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr},
+ {option, ?VHOST_OPT, "/"}, {option, ?SCOPE_OPT, "client"}],
+ FullCommand),
+ Opts1 = lists:map(fun({K, V}) ->
+ case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
+ _ -> {K, V}
+ end
+ end, Opts),
+ Command = list_to_atom(Command0),
+ Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
+ Node = proplists:get_value(?NODE_OPT, Opts1),
Inform = case Quiet of
true -> fun (_Format, _Args1) -> ok end;
false -> fun (Format, Args1) ->
io:format(Format ++ " ...~n", Args1)
- end
+ end
end,
%% The reason we don't use a try/catch here is that rpc:call turns
%% thrown errors into normal return values
- case catch action(Command, Node, Args, Inform) of
+ case catch action(Command, Node, Args, Opts, Inform) of
ok ->
case Quiet of
true -> ok;
@@ -118,15 +138,6 @@ print_badrpc_diagnostics(Node) ->
fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]),
ok.
-parse_args(["-n", NodeS | Args], Params) ->
- parse_args(Args, Params#params{node = rabbit_misc:makenode(NodeS)});
-parse_args(["-q" | Args], Params) ->
- parse_args(Args, Params#params{quiet = true});
-parse_args([Command | Args], Params) ->
- Params#params{command = list_to_atom(Command), args = Args};
-parse_args([], _) ->
- usage().
-
stop() ->
ok.
@@ -134,39 +145,39 @@ usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
halt(1).
-action(stop, Node, [], Inform) ->
+action(stop, Node, [], _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
call(Node, {rabbit, stop_and_halt, []});
-action(stop_app, Node, [], Inform) ->
+action(stop_app, Node, [], _Opts, Inform) ->
Inform("Stopping node ~p", [Node]),
call(Node, {rabbit, stop, []});
-action(start_app, Node, [], Inform) ->
+action(start_app, Node, [], _Opts, Inform) ->
Inform("Starting node ~p", [Node]),
call(Node, {rabbit, start, []});
-action(reset, Node, [], Inform) ->
+action(reset, Node, [], _Opts, Inform) ->
Inform("Resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, reset, []});
-action(force_reset, Node, [], Inform) ->
+action(force_reset, Node, [], _Opts, Inform) ->
Inform("Forcefully resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, force_reset, []});
-action(cluster, Node, ClusterNodeSs, Inform) ->
+action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Clustering node ~p with ~p",
[Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
-action(force_cluster, Node, ClusterNodeSs, Inform) ->
+action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
[Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
-action(status, Node, [], Inform) ->
+action(status, Node, [], _Opts, Inform) ->
Inform("Status of node ~p", [Node]),
case call(Node, {rabbit, status, []}) of
{badrpc, _} = Res -> Res;
@@ -174,129 +185,117 @@ action(status, Node, [], Inform) ->
ok
end;
-action(rotate_logs, Node, [], Inform) ->
+action(rotate_logs, Node, [], _Opts, Inform) ->
Inform("Reopening logs for node ~p", [Node]),
call(Node, {rabbit, rotate_logs, [""]});
-action(rotate_logs, Node, Args = [Suffix], Inform) ->
+action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) ->
Inform("Rotating logs to files with suffix ~p", [Suffix]),
call(Node, {rabbit, rotate_logs, Args});
-action(close_connection, Node, [PidStr, Explanation], Inform) ->
+action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
Inform("Closing connection ~s", [PidStr]),
rpc_call(Node, rabbit_networking, close_connection,
[rabbit_misc:string_to_pid(PidStr), Explanation]);
-action(add_user, Node, Args = [Username, _Password], Inform) ->
+action(add_user, Node, Args = [Username, _Password], _Opts, Inform) ->
Inform("Creating user ~p", [Username]),
call(Node, {rabbit_access_control, add_user, Args});
-action(delete_user, Node, Args = [_Username], Inform) ->
+action(delete_user, Node, Args = [_Username], _Opts, Inform) ->
Inform("Deleting user ~p", Args),
call(Node, {rabbit_access_control, delete_user, Args});
-action(change_password, Node, Args = [Username, _Newpassword], Inform) ->
+action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
Inform("Changing password for user ~p", [Username]),
call(Node, {rabbit_access_control, change_password, Args});
-action(list_users, Node, [], Inform) ->
+action(list_users, Node, [], _Opts, Inform) ->
Inform("Listing users", []),
display_list(call(Node, {rabbit_access_control, list_users, []}));
-action(add_vhost, Node, Args = [_VHostPath], Inform) ->
+action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Creating vhost ~p", Args),
call(Node, {rabbit_access_control, add_vhost, Args});
-action(delete_vhost, Node, Args = [_VHostPath], Inform) ->
+action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Deleting vhost ~p", Args),
call(Node, {rabbit_access_control, delete_vhost, Args});
-action(list_vhosts, Node, [], Inform) ->
+action(list_vhosts, Node, [], _Opts, Inform) ->
Inform("Listing vhosts", []),
display_list(call(Node, {rabbit_access_control, list_vhosts, []}));
-action(list_user_permissions, Node, Args = [_Username], Inform) ->
+action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) ->
Inform("Listing permissions for user ~p", Args),
display_list(call(Node, {rabbit_access_control, list_user_permissions,
Args}));
-action(list_queues, Node, Args, Inform) ->
+action(list_queues, Node, Args, Opts, Inform) ->
Inform("Listing queues", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
- ArgAtoms = default_if_empty(RemainingArgs, [name, messages]),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [name, messages]),
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
-action(list_exchanges, Node, Args, Inform) ->
+action(list_exchanges, Node, Args, Opts, Inform) ->
Inform("Listing exchanges", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
- ArgAtoms = default_if_empty(RemainingArgs, [name, type]),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [name, type]),
display_info_list(rpc_call(Node, rabbit_exchange, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
-action(list_bindings, Node, Args, Inform) ->
+action(list_bindings, Node, _Args, Opts, Inform) ->
Inform("Listing bindings", []),
- {VHostArg, _} = parse_vhost_flag_bin(Args),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
InfoKeys = [exchange_name, queue_name, routing_key, args],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
InfoKeys);
-action(list_connections, Node, Args, Inform) ->
+action(list_connections, Node, Args, _Opts, Inform) ->
Inform("Listing connections", []),
ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
-action(list_channels, Node, Args, Inform) ->
+action(list_channels, Node, Args, _Opts, Inform) ->
Inform("Listing channels", []),
ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count,
messages_unacknowledged]),
display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
ArgAtoms);
-action(list_consumers, Node, Args, Inform) ->
+action(list_consumers, Node, _Args, Opts, Inform) ->
Inform("Listing consumers", []),
- {VHostArg, _} = parse_vhost_flag_bin(Args),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])],
InfoKeys);
-action(Command, Node, Args, Inform) ->
- {VHost, RemainingArgs} = parse_vhost_flag(Args),
- action(Command, Node, VHost, RemainingArgs, Inform).
-
-action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) ->
+action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ Scope = proplists:get_value(?SCOPE_OPT, Opts),
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
call(Node, {rabbit_access_control, set_permissions,
- [Username, VHost, CPerm, WPerm, RPerm]});
+ [Scope, Username, VHost, CPerm, WPerm, RPerm]});
-action(clear_permissions, Node, VHost, [Username], Inform) ->
+action(clear_permissions, Node, [Username], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]),
call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]});
-action(list_permissions, Node, VHost, [], Inform) ->
+action(list_permissions, Node, [], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost ~p", [VHost]),
display_list(call(Node, {rabbit_access_control, list_vhost_permissions,
[VHost]})).
-parse_vhost_flag(Args) when is_list(Args) ->
- case Args of
- ["-p", VHost | RemainingArgs] ->
- {VHost, RemainingArgs};
- RemainingArgs ->
- {"/", RemainingArgs}
- end.
-
-parse_vhost_flag_bin(Args) ->
- {VHost, RemainingArgs} = parse_vhost_flag(Args),
- {list_to_binary(VHost), RemainingArgs}.
-
default_if_empty(List, Default) when is_list(List) ->
if List == [] ->
Default;
@@ -357,6 +356,8 @@ rpc_call(Node, Mod, Fun, Args) ->
%% characters. We don't escape characters above 127, since they may
%% form part of UTF-8 strings.
+escape(Atom) when is_atom(Atom) ->
+ escape(atom_to_list(Atom));
escape(Bin) when is_binary(Bin) ->
escape(binary_to_list(Bin));
escape(L) when is_list(L) ->
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 07027cd5c6..113ffcb4bc 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -33,6 +33,7 @@
-include("rabbit.hrl").
+-export([start_link/0]).
-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]).
-export([ensure_stats_timer_after/2, reset_stats_timer_after/1]).
-export([stats_level/1]).
@@ -68,6 +69,7 @@
-type(timer_fun() :: fun (() -> 'ok')).
+-spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any())).
-spec(init_stats_timer/0 :: () -> state()).
-spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()).
-spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()).
@@ -80,6 +82,9 @@
%%----------------------------------------------------------------------------
+start_link() ->
+ gen_event:start_link({local, ?MODULE}).
+
init_stats_timer() ->
{ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
#state{level = StatsLevel, timer = undefined}.
@@ -120,9 +125,11 @@ stats_level(#state{level = Level}) ->
notify(Type, Props) ->
try
+ %% TODO: switch to os:timestamp() when we drop support for
+ %% Erlang/OTP < R13B01
gen_event:notify(rabbit_event, #event{type = Type,
props = Props,
- timestamp = os:timestamp()})
+ timestamp = now()})
catch error:badarg ->
%% badarg means rabbit_event is no longer registered. We never
%% unregister it so the great likelihood is that we're shutting
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 3b665d387e..638596b724 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -64,6 +64,7 @@
-export([version_compare/2, version_compare/3]).
-export([recursive_delete/1, dict_cons/3, orddict_cons/3,
unlink_and_capture_exit/1]).
+-export([get_options/2]).
-import(mnesia).
-import(lists).
@@ -79,6 +80,7 @@
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
-type(resource_name() :: binary()).
+-type(optdef() :: {flag, string()} | {option, string(), any()}).
-type(channel_or_connection_exit()
:: rabbit_types:channel_exit() | rabbit_types:connection_exit()).
@@ -185,6 +187,8 @@
-spec(orddict_cons/3 :: (any(), any(), orddict:dictionary()) ->
orddict:dictionary()).
-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
+-spec(get_options/2 :: ([optdef()], [string()])
+ -> {[string()], [{string(), any()}]}).
-endif.
@@ -704,3 +708,36 @@ unlink_and_capture_exit(Pid) ->
receive {'EXIT', Pid, _} -> ok
after 0 -> ok
end.
+
+% Separate flags and options from arguments.
+% get_options([{flag, "-q"}, {option, "-p", "/"}],
+% ["set_permissions","-p","/","guest",
+% "-q",".*",".*",".*"])
+% == {["set_permissions","guest",".*",".*",".*"],
+% [{"-q",true},{"-p","/"}]}
+get_options(Defs, As) ->
+ lists:foldl(fun(Def, {AsIn, RsIn}) ->
+ {AsOut, Value} = case Def of
+ {flag, Key} ->
+ get_flag(Key, AsIn);
+ {option, Key, Default} ->
+ get_option(Key, Default, AsIn)
+ end,
+ {AsOut, [{Key, Value} | RsIn]}
+ end, {As, []}, Defs).
+
+get_option(K, _Default, [K, V | As]) ->
+ {As, V};
+get_option(K, Default, [Nk | As]) ->
+ {As1, V} = get_option(K, Default, As),
+ {[Nk | As1], V};
+get_option(_, Default, As) ->
+ {As, Default}.
+
+get_flag(K, [K | As]) ->
+ {As, true};
+get_flag(K, [Nk | As]) ->
+ {As1, V} = get_flag(K, As),
+ {[Nk | As1], V};
+get_flag(_, []) ->
+ {[], false}.
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index ea3768d4b4..9257ec82ab 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -49,6 +49,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok(pid())).
-spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
+-spec(shutdown/1 :: (pid()) -> 'ok').
-endif.
@@ -64,7 +65,7 @@ delete_all(CollectorPid) ->
gen_server:call(CollectorPid, delete_all, infinity).
shutdown(CollectorPid) ->
- gen_server:call(CollectorPid, shutdown, infinity).
+ gen_server:cast(CollectorPid, shutdown).
%%----------------------------------------------------------------------------
@@ -87,13 +88,10 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) ->
rabbit_amqqueue:delete(Q, false, false)
end)
|| {MonitorRef, Q} <- dict:to_list(Queues)],
- {reply, ok, State};
+ {reply, ok, State}.
-handle_call(shutdown, _From, State) ->
- {stop, normal, ok, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
+handle_cast(shutdown, State) ->
+ {stop, normal, State}.
handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
State = #state{queues = Queues}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a0e33ed08e..57c2399058 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -308,8 +308,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%%
%% gen_tcp:close(ClientSock),
teardown_profiling(ProfilingValue),
- rabbit_queue_collector:shutdown(Collector),
rabbit_misc:unlink_and_capture_exit(Collector),
+ rabbit_queue_collector:shutdown(Collector),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
@@ -422,8 +422,14 @@ internal_conserve_memory(false, State = #v1{connection_state = blocked,
internal_conserve_memory(_Conserve, State) ->
State.
-close_connection(State = #v1{connection = #connection{
+close_connection(State = #v1{queue_collector = Collector,
+ connection = #connection{
timeout_sec = TimeoutSec}}) ->
+ %% The spec says "Exclusive queues may only be accessed by the
+ %% current connection, and are deleted when that connection
+ %% closes." This does not strictly imply synchrony, but in
+ %% practice it seems to be what people assume.
+ rabbit_queue_collector:delete_all(Collector),
%% We terminate the connection after the specified interval, but
%% no later than ?CLOSING_TIMEOUT seconds.
TimeoutMillisec =
@@ -499,18 +505,13 @@ wait_for_channel_termination(N, TimerRef) ->
end.
maybe_close(State = #v1{connection_state = closing,
- queue_collector = Collector,
connection = #connection{protocol = Protocol},
sock = Sock}) ->
case all_channels() of
[] ->
- %% Spec says "Exclusive queues may only be accessed by the current
- %% connection, and are deleted when that connection closes."
- %% This does not strictly imply synchrony, but in practice it seems
- %% to be what people assume.
- rabbit_queue_collector:delete_all(Collector),
+ NewState = close_connection(State),
ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
- close_connection(State);
+ NewState;
_ -> State
end;
maybe_close(State) ->
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index d50b9f3126..ec049a1a2c 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -69,8 +69,8 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
deliver(QPids, Delivery) ->
{Success, _} =
delegate:invoke(QPids,
- fun (Pid) ->
- rabbit_amqqueue:deliver(Pid, Delivery)
+ fun (Pid) ->
+ rabbit_amqqueue:deliver(Pid, Delivery)
end),
{Routed, Handled} =
lists:foldl(fun fold_deliveries/2, {false, []}, Success),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 923c3403e1..6812b8d4fe 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -68,6 +68,7 @@ all_tests() ->
passed = test_app_management(),
passed = test_log_management_during_startup(),
passed = test_statistics(),
+ passed = test_option_parser(),
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
@@ -725,6 +726,30 @@ test_log_management_during_startup() ->
ok = control_action(start_app, []),
passed.
+test_option_parser() ->
+ % command and arguments should just pass through
+ ok = check_get_options({["mock_command", "arg1", "arg2"], []},
+ [], ["mock_command", "arg1", "arg2"]),
+
+ % get flags
+ ok = check_get_options(
+ {["mock_command", "arg1"], [{"-f", true}, {"-f2", false}]},
+ [{flag, "-f"}, {flag, "-f2"}], ["mock_command", "arg1", "-f"]),
+
+ % get options
+ ok = check_get_options(
+ {["mock_command"], [{"-foo", "bar"}, {"-baz", "notbaz"}]},
+ [{option, "-foo", "notfoo"}, {option, "-baz", "notbaz"}],
+ ["mock_command", "-foo", "bar"]),
+
+ % shuffled and interleaved arguments and options
+ ok = check_get_options(
+ {["a1", "a2", "a3"], [{"-o1", "hello"}, {"-o2", "noto2"}, {"-f", true}]},
+ [{option, "-o1", "noto1"}, {flag, "-f"}, {option, "-o2", "noto2"}],
+ ["-f", "a1", "-o1", "hello", "a2", "a3"]),
+
+ passed.
+
test_cluster_management() ->
%% 'cluster' and 'reset' should only work if the app is stopped
@@ -860,7 +885,7 @@ test_cluster_management2(SecondaryNode) ->
%% attempt to leave cluster when no other node is alive
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
ok = control_action(start_app, []),
- ok = control_action(stop_app, SecondaryNode, []),
+ ok = control_action(stop_app, SecondaryNode, [], []),
ok = control_action(stop_app, []),
{error, {no_running_cluster_nodes, _, _}} =
control_action(reset, []),
@@ -868,9 +893,9 @@ test_cluster_management2(SecondaryNode) ->
%% leave system clustered, with the secondary node as a ram node
ok = control_action(force_reset, []),
ok = control_action(start_app, []),
- ok = control_action(force_reset, SecondaryNode, []),
- ok = control_action(cluster, SecondaryNode, [NodeS]),
- ok = control_action(start_app, SecondaryNode, []),
+ ok = control_action(force_reset, SecondaryNode, [], []),
+ ok = control_action(cluster, SecondaryNode, [NodeS], []),
+ ok = control_action(start_app, SecondaryNode, [], []),
passed.
@@ -890,9 +915,12 @@ test_user_management() ->
{error, {no_such_user, _}} =
control_action(list_user_permissions, ["foo"]),
{error, {no_such_vhost, _}} =
- control_action(list_permissions, ["-p", "/testhost"]),
+ control_action(list_permissions, [], [{"-p", "/testhost"}]),
{error, {invalid_regexp, _, _}} =
control_action(set_permissions, ["guest", "+foo", ".*", ".*"]),
+ {error, {invalid_scope, _}} =
+ control_action(set_permissions, ["guest", "foo", ".*", ".*"],
+ [{"-s", "cilent"}]),
%% user creation
ok = control_action(add_user, ["foo", "bar"]),
@@ -908,16 +936,21 @@ test_user_management() ->
ok = control_action(list_vhosts, []),
%% user/vhost mapping
- ok = control_action(set_permissions, ["-p", "/testhost",
- "foo", ".*", ".*", ".*"]),
- ok = control_action(set_permissions, ["-p", "/testhost",
- "foo", ".*", ".*", ".*"]),
- ok = control_action(list_permissions, ["-p", "/testhost"]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}, {"-s", "client"}]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}, {"-s", "all"}]),
+ ok = control_action(list_permissions, [], [{"-p", "/testhost"}]),
+ ok = control_action(list_permissions, [], [{"-p", "/testhost"}]),
ok = control_action(list_user_permissions, ["foo"]),
%% user/vhost unmapping
- ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]),
- ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]),
+ ok = control_action(clear_permissions, ["foo"], [{"-p", "/testhost"}]),
+ ok = control_action(clear_permissions, ["foo"], [{"-p", "/testhost"}]),
%% vhost deletion
ok = control_action(delete_vhost, ["/testhost"]),
@@ -926,8 +959,8 @@ test_user_management() ->
%% deleting a populated vhost
ok = control_action(add_vhost, ["/testhost"]),
- ok = control_action(set_permissions, ["-p", "/testhost",
- "foo", ".*", ".*", ".*"]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}]),
ok = control_action(delete_vhost, ["/testhost"]),
%% user deletion
@@ -1223,11 +1256,16 @@ test_delegates_sync(SecondaryNode) ->
%---------------------------------------------------------------------
-control_action(Command, Args) -> control_action(Command, node(), Args).
+control_action(Command, Args) ->
+ control_action(Command, node(), Args, default_options()).
-control_action(Command, Node, Args) ->
+control_action(Command, Args, NewOpts) ->
+ control_action(Command, node(), Args,
+ expand_options(default_options(), NewOpts)).
+
+control_action(Command, Node, Args, Opts) ->
case catch rabbit_control:action(
- Command, Node, Args,
+ Command, Node, Args, Opts,
fun (Format, Args1) ->
io:format(Format ++ " ...~n", Args1)
end) of
@@ -1241,13 +1279,28 @@ control_action(Command, Node, Args) ->
info_action(Command, Args, CheckVHost) ->
ok = control_action(Command, []),
- if CheckVHost -> ok = control_action(Command, ["-p", "/"]);
+ if CheckVHost -> ok = control_action(Command, []);
true -> ok
end,
ok = control_action(Command, lists:map(fun atom_to_list/1, Args)),
{bad_argument, dummy} = control_action(Command, ["dummy"]),
ok.
+default_options() -> [{"-s", "client"}, {"-p", "/"}, {"-q", "false"}].
+
+expand_options(As, Bs) ->
+ lists:foldl(fun({K, _}=A, R) ->
+ case proplists:is_defined(K, R) of
+ true -> R;
+ false -> [A | R]
+ end
+ end, Bs, As).
+
+check_get_options({ExpArgs, ExpOpts}, Defs, Args) ->
+ {ExpArgs, ResOpts} = rabbit_misc:get_options(Defs, Args),
+ true = lists:sort(ExpOpts) == lists:sort(ResOpts), % don't care about the order
+ ok.
+
empty_files(Files) ->
[case file:read_file_info(File) of
{ok, FInfo} -> FInfo#file_info.size == 0;
@@ -1735,7 +1788,7 @@ with_fresh_variable_queue(Fun) ->
{len, 0}]),
_ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)),
passed.
-
+
test_variable_queue() ->
[passed = with_fresh_variable_queue(F) ||
F <- [fun test_variable_queue_dynamic_duration_change/1,