diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-08-24 15:41:03 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-08-24 15:52:50 +0100 |
| commit | 086fd01ae1356c358bbc2ebc0f02a375b8cd618b (patch) | |
| tree | a3c707f531c0af19d354f82bf2db0c9de26810f6 | |
| parent | f057a8c90c103eb4d7012df47478fbfb51988f00 (diff) | |
| download | rabbitmq-server-git-086fd01ae1356c358bbc2ebc0f02a375b8cd618b.tar.gz | |
operator policies
| -rw-r--r-- | src/rabbit_control_main.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 141 |
2 files changed, 139 insertions, 44 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index c96f662dda..13ca805bfd 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -285,14 +285,14 @@ action(start_app, Node, [], _Opts, Inform) -> action(reset, Node, [], _Opts, Inform) -> Inform("Resetting node ~p", [Node]), - require_mnesia_stopped(Node, + require_mnesia_stopped(Node, fun() -> call(Node, {rabbit_mnesia, reset, []}) end); action(force_reset, Node, [], _Opts, Inform) -> Inform("Forcefully resetting node ~p", [Node]), - require_mnesia_stopped(Node, + require_mnesia_stopped(Node, fun() -> call(Node, {rabbit_mnesia, force_reset, []}) end); @@ -304,21 +304,21 @@ action(join_cluster, Node, [ClusterNodeS], Opts, Inform) -> false -> disc end, Inform("Clustering node ~p with ~p", [Node, ClusterNode]), - require_mnesia_stopped(Node, + require_mnesia_stopped(Node, fun() -> rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, NodeType]) end); action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) -> Inform("Turning ~p into a ram node", [Node]), - require_mnesia_stopped(Node, + require_mnesia_stopped(Node, fun() -> rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram]) end); action(change_cluster_node_type, Node, [Type], _Opts, Inform) when Type =:= "disc" orelse Type =:= "disk" -> Inform("Turning ~p into a disc node", [Node]), - require_mnesia_stopped(Node, + require_mnesia_stopped(Node, fun() -> rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc]) end); @@ -326,7 +326,7 @@ action(change_cluster_node_type, Node, [Type], _Opts, Inform) action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) -> ClusterNode = list_to_atom(ClusterNodeS), Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]), - require_mnesia_stopped(Node, + require_mnesia_stopped(Node, fun() -> rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode]) end); @@ -488,9 +488,9 @@ action(set_disk_free_limit, Node, ["mem_relative", Arg], _Opts, Inform) -> _ -> Arg end), Inform("Setting disk free limit on ~p to ~p of total RAM", [Node, Frac]), - rpc_call(Node, - rabbit_disk_monitor, - set_disk_free_limit, + rpc_call(Node, + rabbit_disk_monitor, + set_disk_free_limit, [{mem_relative, Frac}]); @@ -544,6 +544,28 @@ action(clear_policy, Node, [Key], Opts, Inform) -> Inform("Clearing policy ~p", [Key]), rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]); +action(set_operator_policy, Node, [Key, Pattern, Defn], Opts, Inform) -> + Msg = "Setting operator policy override ~p for pattern ~p to ~p with priority ~p", + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts), + ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)), + Inform(Msg, [Key, Pattern, Defn, PriorityArg]), + Res = rpc_call( + Node, rabbit_policy, parse_set_op, + [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]), + case Res of + {error, Format, Args} when is_list(Format) andalso is_list(Args) -> + {error_string, rabbit_misc:format(Format, Args)}; + _ -> + Res + end; + +action(clear_operator_policy, Node, [Key], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Clearing operator policy ~p", [Key]), + rpc_call(Node, rabbit_policy, delete_op, [VHostArg, list_to_binary(Key)]); + + action(report, Node, _Args, _Opts, Inform) -> Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || @@ -973,7 +995,7 @@ escape(Bin, IsEscaped) when is_binary(Bin) -> escape(L, false) when is_list(L) -> escape_char(lists:reverse(L), []); escape(L, true) when is_list(L) -> - L. + L. escape_char([$\\ | T], Acc) -> escape_char(T, [$\\, $\\ | Acc]); diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index a9caadf972..18d5c44261 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -46,6 +46,8 @@ -export([validate/5, notify/4, notify_clear/3]). -export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1, list_formatted/1, list_formatted/3, info_keys/0]). +-export([parse_set_op/6, set_op/6, delete_op/2, lookup_op/2, list_op/0, list_op/1, + list_formatted_op/1, list_formatted_op/3]). -rabbit_boot_step({?MODULE, [{description, "policy parameters"}, @@ -65,13 +67,14 @@ name0(Policy) -> pget(name, Policy). set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. -set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)). +set0(Name = #resource{virtual_host = VHost}) -> + match(Name, list(VHost), list_op(VHost)). get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy); get(Name, #exchange{policy = Policy}) -> get0(Name, Policy); %% Caution - SLOW. get(Name, EntityName = #resource{virtual_host = VHost}) -> - get0(Name, match(EntityName, list(VHost))). + get0(Name, match(EntityName, list(VHost), list_op(VHost))). get0(_Name, undefined) -> undefined; get0(Name, List) -> case pget(definition, List) of @@ -112,19 +115,20 @@ recover0() -> Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}), Qs = mnesia:dirty_match_object(rabbit_durable_queue, #amqqueue{_ = '_'}), Policies = list(), + OpPolicies = list_op(), [rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:write( rabbit_durable_exchange, rabbit_exchange_decorator:set( - X#exchange{policy = match(Name, Policies)}), write) + X#exchange{policy = match(Name, Policies, OpPolicies)}), write) end) || X = #exchange{name = Name} <- Xs], [rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:write( rabbit_durable_queue, rabbit_queue_decorator:set( - Q#amqqueue{policy = match(Name, Policies)}), write) + Q#amqqueue{policy = match(Name, Policies, OpPolicies)}), write) end) || Q = #amqqueue{name = Name} <- Qs], ok. @@ -133,17 +137,23 @@ invalid_file() -> %%---------------------------------------------------------------------------- +parse_set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> + parse_set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo). + parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> + parse_set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo). + +parse_set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) -> try list_to_integer(Priority) of - Num -> parse_set0(VHost, Name, Pattern, Definition, Num, ApplyTo) + Num -> parse_set0(Type, VHost, Name, Pattern, Definition, Num, ApplyTo) catch error:badarg -> {error, "~p priority must be a number", [Priority]} end. -parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) -> +parse_set0(Type, VHost, Name, Pattern, Defn, Priority, ApplyTo) -> case rabbit_misc:json_decode(Defn) of {ok, JSON} -> - set0(VHost, Name, + set0(Type, VHost, Name, [{<<"pattern">>, list_to_binary(Pattern)}, {<<"definition">>, rabbit_misc:json_to_term(JSON)}, {<<"priority">>, Priority}, @@ -152,7 +162,13 @@ parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) -> {error_string, "JSON decoding error"} end. +set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> + set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo). + set(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> + set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo). + +set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) -> PolicyProps = [{<<"pattern">>, Pattern}, {<<"definition">>, Definition}, {<<"priority">>, case Priority of @@ -163,20 +179,47 @@ set(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> undefined -> <<"all">>; _ -> ApplyTo end}], - set0(VHost, Name, PolicyProps). + set0(Type, VHost, Name, PolicyProps). + +set0(Type, VHost, Name, Term) -> + rabbit_runtime_parameters:set_any(VHost, Type, Name, Term, none). -set0(VHost, Name, Term) -> - rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term, none). +delete_op(VHost, Name) -> + rabbit_runtime_parameters:clear_any(VHost, <<"operator_policy">>, Name). delete(VHost, Name) -> rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name). +lookup_op(VHost, Name) -> + case rabbit_runtime_parameters:lookup(VHost, <<"operator_policy">>, Name) of + not_found -> not_found; + P -> p(P, fun ident/1) + end. + lookup(VHost, Name) -> case rabbit_runtime_parameters:lookup(VHost, <<"policy">>, Name) of not_found -> not_found; P -> p(P, fun ident/1) end. +list_op() -> + list_op('_'). + +list_op(VHost) -> + list0_op(VHost, fun ident/1). + +list_formatted_op(VHost) -> + order_policies(list0_op(VHost, fun format/1)). + +list_formatted_op(VHost, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map(AggregatorPid, Ref, + fun(P) -> P end, list_formatted_op(VHost)). + +list0_op(VHost, DefnFun) -> + [p(P, DefnFun) + || P <- rabbit_runtime_parameters:list(VHost, <<"operator_policy">>)]. + + list() -> list('_'). @@ -194,8 +237,7 @@ list0(VHost, DefnFun) -> [p(P, DefnFun) || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)]. order_policies(PropList) -> - lists:sort(fun (A, B) -> pget(priority, A) < pget(priority, B) end, - PropList). + lists:sort(fun (A, B) -> not sort_pred(A, B) end, PropList). p(Parameter, DefnFun) -> Value = pget(value, Parameter), @@ -239,26 +281,26 @@ update_policies(VHost) -> Tabs = [rabbit_queue, rabbit_durable_queue, rabbit_exchange, rabbit_durable_exchange], {Xs, Qs} = rabbit_misc:execute_mnesia_transaction( - fun() -> - [mnesia:lock({table, T}, write) || T <- Tabs], %% [1] - case catch list(VHost) of - {'EXIT', {throw, {error, {no_such_vhost, _}}}} -> - {[], []}; %% [2] - {'EXIT', Exit} -> - exit(Exit); - Policies -> - {[update_exchange(X, Policies) || - X <- rabbit_exchange:list(VHost)], - [update_queue(Q, Policies) || - Q <- rabbit_amqqueue:list(VHost)]} - end - end), + fun() -> + [mnesia:lock({table, T}, write) || T <- Tabs], %% [1] + case catch {list(VHost), list_op(VHost)} of + {'EXIT', {throw, {error, {no_such_vhost, _}}}} -> + {[], []}; %% [2] + {'EXIT', Exit} -> + exit(Exit); + {Policies, OpPolicies} -> + {[update_exchange(X, Policies, OpPolicies) || + X <- rabbit_exchange:list(VHost)], + [update_queue(Q, Policies, OpPolicies) || + Q <- rabbit_amqqueue:list(VHost)]} + end + end), [catch notify(X) || X <- Xs], [catch notify(Q) || Q <- Qs], ok. -update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> - case match(XName, Policies) of +update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies, OpPolicies) -> + case match(XName, Policies, OpPolicies) of OldPolicy -> no_change; NewPolicy -> case rabbit_exchange:update( XName, fun (X0) -> @@ -270,8 +312,8 @@ update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> end end. -update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> - case match(QName, Policies) of +update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies, OpPolicies) -> + case match(QName, Policies, OpPolicies) of OldPolicy -> no_change; NewPolicy -> case rabbit_amqqueue:update( QName, fun(Q1) -> @@ -292,12 +334,43 @@ notify({X1 = #exchange{}, X2 = #exchange{}}) -> notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) -> rabbit_amqqueue:policy_changed(Q1, Q2). -match(Name, Policies) -> - case lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]) of - [] -> undefined; - [Policy | _Rest] -> Policy +match(Name, Policies, OpPolicies) -> + MatchedPolicies = match_all(Name, Policies), + MatchedOpPolicies = match_all(Name, OpPolicies), + case {MatchedPolicies, MatchedOpPolicies} of + {[], []} -> undefined; + {[Policy | _], []} -> Policy; + {[], [OpPolicy | _]} -> OpPolicy; + {[Policy | _], [OpPolicy | _]} -> merge_op_policy(Policy, OpPolicy) + end. + +merge_op_policy(Policy, OpPolicy) -> + OpDefinition = pget(definition, OpPolicy), + Definition = pget(definition, Policy), + ResultDefinition = lists:map(fun(Key) -> + case {pget(Key, Definition), pget(Key, OpDefinition)} of + {undefined, undefined} -> undefined; + {undefined, Val} -> Val; + {Val, undefined} -> Val; + {Val, OpVal} -> merge_policy_value(Key, Val, OpVal) + end + end), + lists:keyreplace(definition, 1, Policy, {definition, ResultDefinition}). + +merge_policy_value(Name, PolicyVal, OpVal) -> + case policy_merge_strategy(Name) of + undefined -> PolicyVal; + Strategy -> Strategy(PolicyVal, OpVal) end. +policy_merge_strategy(<<"message-ttl">>) -> fun erlang:min/2; +policy_merge_strategy(<<"max-length">>) -> fun erlang:min/2; +policy_merge_strategy(<<"max-length-bytes">>) -> fun erlang:min/2; +policy_merge_strategy(_) -> undefined. + +match_all(Name, Policies) -> + lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]). + matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) -> matches_type(Kind, pget('apply-to', Policy)) andalso match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso |
