diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-05 15:56:46 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-05 15:56:46 +0100 |
| commit | 5e7719a0e55e57b59c2ef40ad41b8e03eb116120 (patch) | |
| tree | d4ed9b42d7f456ef8cb206a9c714515f23a6cc8c /src | |
| parent | 22a99dc7c4040a53c78d74baa29a0ede6f87b905 (diff) | |
| parent | fbdc8c26f8915d72548415e2ff20fe546968b2b8 (diff) | |
| download | rabbitmq-server-git-5e7719a0e55e57b59c2ef40ad41b8e03eb116120.tar.gz | |
merge default into bug23027
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 141 | ||||
| -rw-r--r-- | src/rabbit_event.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_queue_collector.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 91 |
11 files changed, 236 insertions, 127 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 1fab7e4d82..41c628a071 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -90,9 +90,9 @@ {enables, kernel_ready}]}). -rabbit_boot_step({rabbit_event, - [{description, "statistics event handler"}, + [{description, "statistics event manager"}, {mfa, {rabbit_sup, start_restartable_child, - [gen_event, [{local, rabbit_event}]]}}, + [rabbit_event]}}, {requires, external_infrastructure}, {enables, kernel_ready}]}). diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 1dbda8058e..8d00f59124 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -38,7 +38,7 @@ -export([add_user/2, delete_user/1, change_password/2, list_users/0, lookup_user/1]). -export([add_vhost/1, delete_vhost/1, list_vhosts/0]). --export([set_permissions/5, clear_permissions/2, +-export([set_permissions/5, set_permissions/6, clear_permissions/2, list_vhost_permissions/1, list_user_permissions/1]). %%---------------------------------------------------------------------------- @@ -51,6 +51,7 @@ -type(username() :: binary()). -type(password() :: binary()). -type(regexp() :: binary()). +-type(scope() :: binary()). -spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user() | @@ -78,6 +79,8 @@ -spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). -spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). +-spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(), + regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok'). -spec(list_vhost_permissions/1 :: (rabbit_types:vhost()) @@ -157,6 +160,7 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [VHostPath, Username]) end. +permission_index(scope) -> #permission.scope; permission_index(configure) -> #permission.configure; permission_index(write) -> #permission.write; permission_index(read) -> #permission.read. @@ -169,7 +173,7 @@ check_resource_access(Username, Permission); check_resource_access(_Username, #resource{name = <<"amq.gen",_/binary>>}, - _Permission) -> + #permission{scope = client}) -> ok; check_resource_access(Username, R = #resource{virtual_host = VHostPath, name = Name}, @@ -300,7 +304,7 @@ internal_delete_vhost(VHostPath) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), - lists:foreach(fun ({Username, _, _, _}) -> + lists:foreach(fun ({Username, _, _, _, _}) -> ok = clear_permissions(Username, VHostPath) end, list_vhost_permissions(VHostPath)), @@ -318,7 +322,16 @@ validate_regexp(RegexpBin) -> end. set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> + set_permissions(<<"client">>, Username, VHostPath, ConfigurePerm, + WritePerm, ReadPerm). + +set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), + Scope = case ScopeBin of + <<"client">> -> client; + <<"all">> -> all; + _ -> throw({error, {invalid_scope, ScopeBin}}) + end, rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, @@ -328,12 +341,14 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> username = Username, virtual_host = VHostPath}, permission = #permission{ + scope = Scope, configure = ConfigurePerm, write = WritePerm, read = ReadPerm}}, write) end)). + clear_permissions(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( @@ -345,22 +360,23 @@ clear_permissions(Username, VHostPath) -> end)). list_vhost_permissions(VHostPath) -> - [{Username, ConfigurePerm, WritePerm, ReadPerm} || - {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- + [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} || + {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- list_permissions(rabbit_misc:with_vhost( VHostPath, match_user_vhost('_', VHostPath)))]. list_user_permissions(Username) -> - [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || - {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- + [{VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || + {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- list_permissions(rabbit_misc:with_user( Username, match_user_vhost(Username, '_')))]. list_permissions(QueryThunk) -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || #user_permission{user_vhost = #user_vhost{username = Username, virtual_host = VHostPath}, permission = #permission{ + scope = Scope, configure = ConfigurePerm, write = WritePerm, read = ReadPerm}} <- diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d4226331c3..b55d5b210c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -355,7 +355,7 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). -emit_stats(#amqqueue{pid = QPid}) -> +emit_stats(#amqqueue{pid = QPid}) -> delegate_pcast(QPid, 7, emit_stats). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> @@ -386,7 +386,6 @@ reject(QPid, MsgIds, Requeue, ChPid) -> commit_all(QPids, Txn, ChPid) -> safe_delegate_call_ok( - fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end, QPids). @@ -396,9 +395,6 @@ rollback_all(QPids, Txn, ChPid) -> notify_down_all(QPids, ChPid) -> safe_delegate_call_ok( - %% we don't care if the queue process has terminated in the - %% meantime - fun (_) -> ok end, fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, QPids). @@ -493,11 +489,11 @@ pseudo_queue(QueueName, Pid) -> arguments = [], pid = Pid}. -safe_delegate_call_ok(H, F, Pids) -> +safe_delegate_call_ok(F, Pids) -> {_, Bad} = delegate:invoke(Pids, fun (Pid) -> rabbit_misc:with_exit_handler( - fun () -> H(Pid) end, + fun () -> ok end, fun () -> F(Pid) end) end), case Bad of diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5e1b1f716c..d52660c5ac 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -241,7 +241,7 @@ stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{rate_timer_ref = undefined}. - + stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> State; stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> @@ -887,7 +887,7 @@ handle_cast(maybe_expire, State) -> {stop, normal, State}; false -> noreply(ensure_expiry_timer(State)) end; - + handle_cast(emit_stats, State) -> emit_stats(State), noreply(State). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6e6ad06cb3..f0b623c2dc 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -32,20 +32,25 @@ -module(rabbit_control). -include("rabbit.hrl"). --export([start/0, stop/0, action/4]). - --record(params, {quiet, node, command, args}). +-export([start/0, stop/0, action/5]). -define(RPC_TIMEOUT, infinity). +-define(QUIET_OPT, "-q"). +-define(NODE_OPT, "-n"). +-define(VHOST_OPT, "-p"). +-define(SCOPE_OPT, "-s"). + %%---------------------------------------------------------------------------- -ifdef(use_specs). -spec(start/0 :: () -> no_return()). -spec(stop/0 :: () -> 'ok'). --spec(action/4 :: (atom(), node(), [string()], - fun ((string(), [any()]) -> 'ok')) -> 'ok'). +-spec(action/5 :: + (atom(), node(), [string()], [{string(), any()}], + fun ((string(), [any()]) -> 'ok')) + -> 'ok'). -spec(usage/0 :: () -> no_return()). -endif. @@ -55,18 +60,33 @@ start() -> {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), FullCommand = init:get_plain_arguments(), - #params{quiet = Quiet, node = Node, command = Command, args = Args} = - parse_args(FullCommand, #params{quiet = false, - node = rabbit_misc:makenode(NodeStr)}), + case FullCommand of + [] -> usage(); + _ -> ok + end, + {[Command0 | Args], Opts} = + rabbit_misc:get_options( + [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr}, + {option, ?VHOST_OPT, "/"}, {option, ?SCOPE_OPT, "client"}], + FullCommand), + Opts1 = lists:map(fun({K, V}) -> + case K of + ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)}; + _ -> {K, V} + end + end, Opts), + Command = list_to_atom(Command0), + Quiet = proplists:get_bool(?QUIET_OPT, Opts1), + Node = proplists:get_value(?NODE_OPT, Opts1), Inform = case Quiet of true -> fun (_Format, _Args1) -> ok end; false -> fun (Format, Args1) -> io:format(Format ++ " ...~n", Args1) - end + end end, %% The reason we don't use a try/catch here is that rpc:call turns %% thrown errors into normal return values - case catch action(Command, Node, Args, Inform) of + case catch action(Command, Node, Args, Opts, Inform) of ok -> case Quiet of true -> ok; @@ -118,15 +138,6 @@ print_badrpc_diagnostics(Node) -> fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]), ok. -parse_args(["-n", NodeS | Args], Params) -> - parse_args(Args, Params#params{node = rabbit_misc:makenode(NodeS)}); -parse_args(["-q" | Args], Params) -> - parse_args(Args, Params#params{quiet = true}); -parse_args([Command | Args], Params) -> - Params#params{command = list_to_atom(Command), args = Args}; -parse_args([], _) -> - usage(). - stop() -> ok. @@ -134,39 +145,39 @@ usage() -> io:format("~s", [rabbit_ctl_usage:usage()]), halt(1). -action(stop, Node, [], Inform) -> +action(stop, Node, [], _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), call(Node, {rabbit, stop_and_halt, []}); -action(stop_app, Node, [], Inform) -> +action(stop_app, Node, [], _Opts, Inform) -> Inform("Stopping node ~p", [Node]), call(Node, {rabbit, stop, []}); -action(start_app, Node, [], Inform) -> +action(start_app, Node, [], _Opts, Inform) -> Inform("Starting node ~p", [Node]), call(Node, {rabbit, start, []}); -action(reset, Node, [], Inform) -> +action(reset, Node, [], _Opts, Inform) -> Inform("Resetting node ~p", [Node]), call(Node, {rabbit_mnesia, reset, []}); -action(force_reset, Node, [], Inform) -> +action(force_reset, Node, [], _Opts, Inform) -> Inform("Forcefully resetting node ~p", [Node]), call(Node, {rabbit_mnesia, force_reset, []}); -action(cluster, Node, ClusterNodeSs, Inform) -> +action(cluster, Node, ClusterNodeSs, _Opts, Inform) -> ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), Inform("Clustering node ~p with ~p", [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]); -action(force_cluster, Node, ClusterNodeSs, Inform) -> +action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)", [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); -action(status, Node, [], Inform) -> +action(status, Node, [], _Opts, Inform) -> Inform("Status of node ~p", [Node]), case call(Node, {rabbit, status, []}) of {badrpc, _} = Res -> Res; @@ -174,129 +185,117 @@ action(status, Node, [], Inform) -> ok end; -action(rotate_logs, Node, [], Inform) -> +action(rotate_logs, Node, [], _Opts, Inform) -> Inform("Reopening logs for node ~p", [Node]), call(Node, {rabbit, rotate_logs, [""]}); -action(rotate_logs, Node, Args = [Suffix], Inform) -> +action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) -> Inform("Rotating logs to files with suffix ~p", [Suffix]), call(Node, {rabbit, rotate_logs, Args}); -action(close_connection, Node, [PidStr, Explanation], Inform) -> +action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) -> Inform("Closing connection ~s", [PidStr]), rpc_call(Node, rabbit_networking, close_connection, [rabbit_misc:string_to_pid(PidStr), Explanation]); -action(add_user, Node, Args = [Username, _Password], Inform) -> +action(add_user, Node, Args = [Username, _Password], _Opts, Inform) -> Inform("Creating user ~p", [Username]), call(Node, {rabbit_access_control, add_user, Args}); -action(delete_user, Node, Args = [_Username], Inform) -> +action(delete_user, Node, Args = [_Username], _Opts, Inform) -> Inform("Deleting user ~p", Args), call(Node, {rabbit_access_control, delete_user, Args}); -action(change_password, Node, Args = [Username, _Newpassword], Inform) -> +action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> Inform("Changing password for user ~p", [Username]), call(Node, {rabbit_access_control, change_password, Args}); -action(list_users, Node, [], Inform) -> +action(list_users, Node, [], _Opts, Inform) -> Inform("Listing users", []), display_list(call(Node, {rabbit_access_control, list_users, []})); -action(add_vhost, Node, Args = [_VHostPath], Inform) -> +action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Creating vhost ~p", Args), call(Node, {rabbit_access_control, add_vhost, Args}); -action(delete_vhost, Node, Args = [_VHostPath], Inform) -> +action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Deleting vhost ~p", Args), call(Node, {rabbit_access_control, delete_vhost, Args}); -action(list_vhosts, Node, [], Inform) -> +action(list_vhosts, Node, [], _Opts, Inform) -> Inform("Listing vhosts", []), display_list(call(Node, {rabbit_access_control, list_vhosts, []})); -action(list_user_permissions, Node, Args = [_Username], Inform) -> +action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) -> Inform("Listing permissions for user ~p", Args), display_list(call(Node, {rabbit_access_control, list_user_permissions, Args})); -action(list_queues, Node, Args, Inform) -> +action(list_queues, Node, Args, Opts, Inform) -> Inform("Listing queues", []), - {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), - ArgAtoms = default_if_empty(RemainingArgs, [name, messages]), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, messages]), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, [VHostArg, ArgAtoms]), ArgAtoms); -action(list_exchanges, Node, Args, Inform) -> +action(list_exchanges, Node, Args, Opts, Inform) -> Inform("Listing exchanges", []), - {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), - ArgAtoms = default_if_empty(RemainingArgs, [name, type]), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, type]), display_info_list(rpc_call(Node, rabbit_exchange, info_all, [VHostArg, ArgAtoms]), ArgAtoms); -action(list_bindings, Node, Args, Inform) -> +action(list_bindings, Node, _Args, Opts, Inform) -> Inform("Listing bindings", []), - {VHostArg, _} = parse_vhost_flag_bin(Args), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), InfoKeys = [exchange_name, queue_name, routing_key, args], display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], InfoKeys); -action(list_connections, Node, Args, Inform) -> +action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); -action(list_channels, Node, Args, Inform) -> +action(list_channels, Node, Args, _Opts, Inform) -> Inform("Listing channels", []), ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count, messages_unacknowledged]), display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]), ArgAtoms); -action(list_consumers, Node, Args, Inform) -> +action(list_consumers, Node, _Args, Opts, Inform) -> Inform("Listing consumers", []), - {VHostArg, _} = parse_vhost_flag_bin(Args), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required], display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])], InfoKeys); -action(Command, Node, Args, Inform) -> - {VHost, RemainingArgs} = parse_vhost_flag(Args), - action(Command, Node, VHost, RemainingArgs, Inform). - -action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) -> +action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Scope = proplists:get_value(?SCOPE_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), call(Node, {rabbit_access_control, set_permissions, - [Username, VHost, CPerm, WPerm, RPerm]}); + [Scope, Username, VHost, CPerm, WPerm, RPerm]}); -action(clear_permissions, Node, VHost, [Username], Inform) -> +action(clear_permissions, Node, [Username], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]}); -action(list_permissions, Node, VHost, [], Inform) -> +action(list_permissions, Node, [], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost ~p", [VHost]), display_list(call(Node, {rabbit_access_control, list_vhost_permissions, [VHost]})). -parse_vhost_flag(Args) when is_list(Args) -> - case Args of - ["-p", VHost | RemainingArgs] -> - {VHost, RemainingArgs}; - RemainingArgs -> - {"/", RemainingArgs} - end. - -parse_vhost_flag_bin(Args) -> - {VHost, RemainingArgs} = parse_vhost_flag(Args), - {list_to_binary(VHost), RemainingArgs}. - default_if_empty(List, Default) when is_list(List) -> if List == [] -> Default; @@ -357,6 +356,8 @@ rpc_call(Node, Mod, Fun, Args) -> %% characters. We don't escape characters above 127, since they may %% form part of UTF-8 strings. +escape(Atom) when is_atom(Atom) -> + escape(atom_to_list(Atom)); escape(Bin) when is_binary(Bin) -> escape(binary_to_list(Bin)); escape(L) when is_list(L) -> diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 07027cd5c6..113ffcb4bc 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -33,6 +33,7 @@ -include("rabbit.hrl"). +-export([start_link/0]). -export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]). -export([ensure_stats_timer_after/2, reset_stats_timer_after/1]). -export([stats_level/1]). @@ -68,6 +69,7 @@ -type(timer_fun() :: fun (() -> 'ok')). +-spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any())). -spec(init_stats_timer/0 :: () -> state()). -spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()). -spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()). @@ -80,6 +82,9 @@ %%---------------------------------------------------------------------------- +start_link() -> + gen_event:start_link({local, ?MODULE}). + init_stats_timer() -> {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), #state{level = StatsLevel, timer = undefined}. @@ -120,9 +125,11 @@ stats_level(#state{level = Level}) -> notify(Type, Props) -> try + %% TODO: switch to os:timestamp() when we drop support for + %% Erlang/OTP < R13B01 gen_event:notify(rabbit_event, #event{type = Type, props = Props, - timestamp = os:timestamp()}) + timestamp = now()}) catch error:badarg -> %% badarg means rabbit_event is no longer registered. We never %% unregister it so the great likelihood is that we're shutting diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3b665d387e..638596b724 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -64,6 +64,7 @@ -export([version_compare/2, version_compare/3]). -export([recursive_delete/1, dict_cons/3, orddict_cons/3, unlink_and_capture_exit/1]). +-export([get_options/2]). -import(mnesia). -import(lists). @@ -79,6 +80,7 @@ -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). -type(resource_name() :: binary()). +-type(optdef() :: {flag, string()} | {option, string(), any()}). -type(channel_or_connection_exit() :: rabbit_types:channel_exit() | rabbit_types:connection_exit()). @@ -185,6 +187,8 @@ -spec(orddict_cons/3 :: (any(), any(), orddict:dictionary()) -> orddict:dictionary()). -spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). +-spec(get_options/2 :: ([optdef()], [string()]) + -> {[string()], [{string(), any()}]}). -endif. @@ -704,3 +708,36 @@ unlink_and_capture_exit(Pid) -> receive {'EXIT', Pid, _} -> ok after 0 -> ok end. + +% Separate flags and options from arguments. +% get_options([{flag, "-q"}, {option, "-p", "/"}], +% ["set_permissions","-p","/","guest", +% "-q",".*",".*",".*"]) +% == {["set_permissions","guest",".*",".*",".*"], +% [{"-q",true},{"-p","/"}]} +get_options(Defs, As) -> + lists:foldl(fun(Def, {AsIn, RsIn}) -> + {AsOut, Value} = case Def of + {flag, Key} -> + get_flag(Key, AsIn); + {option, Key, Default} -> + get_option(Key, Default, AsIn) + end, + {AsOut, [{Key, Value} | RsIn]} + end, {As, []}, Defs). + +get_option(K, _Default, [K, V | As]) -> + {As, V}; +get_option(K, Default, [Nk | As]) -> + {As1, V} = get_option(K, Default, As), + {[Nk | As1], V}; +get_option(_, Default, As) -> + {As, Default}. + +get_flag(K, [K | As]) -> + {As, true}; +get_flag(K, [Nk | As]) -> + {As1, V} = get_flag(K, As), + {[Nk | As1], V}; +get_flag(_, []) -> + {[], false}. diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index ea3768d4b4..9257ec82ab 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -49,6 +49,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok(pid())). -spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). +-spec(shutdown/1 :: (pid()) -> 'ok'). -endif. @@ -64,7 +65,7 @@ delete_all(CollectorPid) -> gen_server:call(CollectorPid, delete_all, infinity). shutdown(CollectorPid) -> - gen_server:call(CollectorPid, shutdown, infinity). + gen_server:cast(CollectorPid, shutdown). %%---------------------------------------------------------------------------- @@ -87,13 +88,10 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) -> rabbit_amqqueue:delete(Q, false, false) end) || {MonitorRef, Q} <- dict:to_list(Queues)], - {reply, ok, State}; + {reply, ok, State}. -handle_call(shutdown, _From, State) -> - {stop, normal, ok, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(shutdown, State) -> + {stop, normal, State}. handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, State = #state{queues = Queues}) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a0e33ed08e..57c2399058 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -308,8 +308,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% %% gen_tcp:close(ClientSock), teardown_profiling(ProfilingValue), - rabbit_queue_collector:shutdown(Collector), rabbit_misc:unlink_and_capture_exit(Collector), + rabbit_queue_collector:shutdown(Collector), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. @@ -422,8 +422,14 @@ internal_conserve_memory(false, State = #v1{connection_state = blocked, internal_conserve_memory(_Conserve, State) -> State. -close_connection(State = #v1{connection = #connection{ +close_connection(State = #v1{queue_collector = Collector, + connection = #connection{ timeout_sec = TimeoutSec}}) -> + %% The spec says "Exclusive queues may only be accessed by the + %% current connection, and are deleted when that connection + %% closes." This does not strictly imply synchrony, but in + %% practice it seems to be what people assume. + rabbit_queue_collector:delete_all(Collector), %% We terminate the connection after the specified interval, but %% no later than ?CLOSING_TIMEOUT seconds. TimeoutMillisec = @@ -499,18 +505,13 @@ wait_for_channel_termination(N, TimerRef) -> end. maybe_close(State = #v1{connection_state = closing, - queue_collector = Collector, connection = #connection{protocol = Protocol}, sock = Sock}) -> case all_channels() of [] -> - %% Spec says "Exclusive queues may only be accessed by the current - %% connection, and are deleted when that connection closes." - %% This does not strictly imply synchrony, but in practice it seems - %% to be what people assume. - rabbit_queue_collector:delete_all(Collector), + NewState = close_connection(State), ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), - close_connection(State); + NewState; _ -> State end; maybe_close(State) -> diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index d50b9f3126..ec049a1a2c 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -69,8 +69,8 @@ deliver(QPids, Delivery = #delivery{mandatory = false, deliver(QPids, Delivery) -> {Success, _} = delegate:invoke(QPids, - fun (Pid) -> - rabbit_amqqueue:deliver(Pid, Delivery) + fun (Pid) -> + rabbit_amqqueue:deliver(Pid, Delivery) end), {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 923c3403e1..6812b8d4fe 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -68,6 +68,7 @@ all_tests() -> passed = test_app_management(), passed = test_log_management_during_startup(), passed = test_statistics(), + passed = test_option_parser(), passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), @@ -725,6 +726,30 @@ test_log_management_during_startup() -> ok = control_action(start_app, []), passed. +test_option_parser() -> + % command and arguments should just pass through + ok = check_get_options({["mock_command", "arg1", "arg2"], []}, + [], ["mock_command", "arg1", "arg2"]), + + % get flags + ok = check_get_options( + {["mock_command", "arg1"], [{"-f", true}, {"-f2", false}]}, + [{flag, "-f"}, {flag, "-f2"}], ["mock_command", "arg1", "-f"]), + + % get options + ok = check_get_options( + {["mock_command"], [{"-foo", "bar"}, {"-baz", "notbaz"}]}, + [{option, "-foo", "notfoo"}, {option, "-baz", "notbaz"}], + ["mock_command", "-foo", "bar"]), + + % shuffled and interleaved arguments and options + ok = check_get_options( + {["a1", "a2", "a3"], [{"-o1", "hello"}, {"-o2", "noto2"}, {"-f", true}]}, + [{option, "-o1", "noto1"}, {flag, "-f"}, {option, "-o2", "noto2"}], + ["-f", "a1", "-o1", "hello", "a2", "a3"]), + + passed. + test_cluster_management() -> %% 'cluster' and 'reset' should only work if the app is stopped @@ -860,7 +885,7 @@ test_cluster_management2(SecondaryNode) -> %% attempt to leave cluster when no other node is alive ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), - ok = control_action(stop_app, SecondaryNode, []), + ok = control_action(stop_app, SecondaryNode, [], []), ok = control_action(stop_app, []), {error, {no_running_cluster_nodes, _, _}} = control_action(reset, []), @@ -868,9 +893,9 @@ test_cluster_management2(SecondaryNode) -> %% leave system clustered, with the secondary node as a ram node ok = control_action(force_reset, []), ok = control_action(start_app, []), - ok = control_action(force_reset, SecondaryNode, []), - ok = control_action(cluster, SecondaryNode, [NodeS]), - ok = control_action(start_app, SecondaryNode, []), + ok = control_action(force_reset, SecondaryNode, [], []), + ok = control_action(cluster, SecondaryNode, [NodeS], []), + ok = control_action(start_app, SecondaryNode, [], []), passed. @@ -890,9 +915,12 @@ test_user_management() -> {error, {no_such_user, _}} = control_action(list_user_permissions, ["foo"]), {error, {no_such_vhost, _}} = - control_action(list_permissions, ["-p", "/testhost"]), + control_action(list_permissions, [], [{"-p", "/testhost"}]), {error, {invalid_regexp, _, _}} = control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), + {error, {invalid_scope, _}} = + control_action(set_permissions, ["guest", "foo", ".*", ".*"], + [{"-s", "cilent"}]), %% user creation ok = control_action(add_user, ["foo", "bar"]), @@ -908,16 +936,21 @@ test_user_management() -> ok = control_action(list_vhosts, []), %% user/vhost mapping - ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*", ".*"]), - ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*", ".*"]), - ok = control_action(list_permissions, ["-p", "/testhost"]), + ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], + [{"-p", "/testhost"}]), + ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], + [{"-p", "/testhost"}]), + ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], + [{"-p", "/testhost"}, {"-s", "client"}]), + ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], + [{"-p", "/testhost"}, {"-s", "all"}]), + ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), + ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), ok = control_action(list_user_permissions, ["foo"]), %% user/vhost unmapping - ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]), - ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]), + ok = control_action(clear_permissions, ["foo"], [{"-p", "/testhost"}]), + ok = control_action(clear_permissions, ["foo"], [{"-p", "/testhost"}]), %% vhost deletion ok = control_action(delete_vhost, ["/testhost"]), @@ -926,8 +959,8 @@ test_user_management() -> %% deleting a populated vhost ok = control_action(add_vhost, ["/testhost"]), - ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*", ".*"]), + ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], + [{"-p", "/testhost"}]), ok = control_action(delete_vhost, ["/testhost"]), %% user deletion @@ -1223,11 +1256,16 @@ test_delegates_sync(SecondaryNode) -> %--------------------------------------------------------------------- -control_action(Command, Args) -> control_action(Command, node(), Args). +control_action(Command, Args) -> + control_action(Command, node(), Args, default_options()). -control_action(Command, Node, Args) -> +control_action(Command, Args, NewOpts) -> + control_action(Command, node(), Args, + expand_options(default_options(), NewOpts)). + +control_action(Command, Node, Args, Opts) -> case catch rabbit_control:action( - Command, Node, Args, + Command, Node, Args, Opts, fun (Format, Args1) -> io:format(Format ++ " ...~n", Args1) end) of @@ -1241,13 +1279,28 @@ control_action(Command, Node, Args) -> info_action(Command, Args, CheckVHost) -> ok = control_action(Command, []), - if CheckVHost -> ok = control_action(Command, ["-p", "/"]); + if CheckVHost -> ok = control_action(Command, []); true -> ok end, ok = control_action(Command, lists:map(fun atom_to_list/1, Args)), {bad_argument, dummy} = control_action(Command, ["dummy"]), ok. +default_options() -> [{"-s", "client"}, {"-p", "/"}, {"-q", "false"}]. + +expand_options(As, Bs) -> + lists:foldl(fun({K, _}=A, R) -> + case proplists:is_defined(K, R) of + true -> R; + false -> [A | R] + end + end, Bs, As). + +check_get_options({ExpArgs, ExpOpts}, Defs, Args) -> + {ExpArgs, ResOpts} = rabbit_misc:get_options(Defs, Args), + true = lists:sort(ExpOpts) == lists:sort(ResOpts), % don't care about the order + ok. + empty_files(Files) -> [case file:read_file_info(File) of {ok, FInfo} -> FInfo#file_info.size == 0; @@ -1735,7 +1788,7 @@ with_fresh_variable_queue(Fun) -> {len, 0}]), _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)), passed. - + test_variable_queue() -> [passed = with_fresh_variable_queue(F) || F <- [fun test_variable_queue_dynamic_duration_change/1, |
