summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2016-08-24 15:41:03 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2016-08-24 15:52:50 +0100
commit086fd01ae1356c358bbc2ebc0f02a375b8cd618b (patch)
treea3c707f531c0af19d354f82bf2db0c9de26810f6
parentf057a8c90c103eb4d7012df47478fbfb51988f00 (diff)
downloadrabbitmq-server-git-086fd01ae1356c358bbc2ebc0f02a375b8cd618b.tar.gz
operator policies
-rw-r--r--src/rabbit_control_main.erl42
-rw-r--r--src/rabbit_policy.erl141
2 files changed, 139 insertions, 44 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index c96f662dda..13ca805bfd 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -285,14 +285,14 @@ action(start_app, Node, [], _Opts, Inform) ->
action(reset, Node, [], _Opts, Inform) ->
Inform("Resetting node ~p", [Node]),
- require_mnesia_stopped(Node,
+ require_mnesia_stopped(Node,
fun() ->
call(Node, {rabbit_mnesia, reset, []})
end);
action(force_reset, Node, [], _Opts, Inform) ->
Inform("Forcefully resetting node ~p", [Node]),
- require_mnesia_stopped(Node,
+ require_mnesia_stopped(Node,
fun() ->
call(Node, {rabbit_mnesia, force_reset, []})
end);
@@ -304,21 +304,21 @@ action(join_cluster, Node, [ClusterNodeS], Opts, Inform) ->
false -> disc
end,
Inform("Clustering node ~p with ~p", [Node, ClusterNode]),
- require_mnesia_stopped(Node,
+ require_mnesia_stopped(Node,
fun() ->
rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, NodeType])
end);
action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) ->
Inform("Turning ~p into a ram node", [Node]),
- require_mnesia_stopped(Node,
+ require_mnesia_stopped(Node,
fun() ->
rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram])
end);
action(change_cluster_node_type, Node, [Type], _Opts, Inform)
when Type =:= "disc" orelse Type =:= "disk" ->
Inform("Turning ~p into a disc node", [Node]),
- require_mnesia_stopped(Node,
+ require_mnesia_stopped(Node,
fun() ->
rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc])
end);
@@ -326,7 +326,7 @@ action(change_cluster_node_type, Node, [Type], _Opts, Inform)
action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) ->
ClusterNode = list_to_atom(ClusterNodeS),
Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]),
- require_mnesia_stopped(Node,
+ require_mnesia_stopped(Node,
fun() ->
rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode])
end);
@@ -488,9 +488,9 @@ action(set_disk_free_limit, Node, ["mem_relative", Arg], _Opts, Inform) ->
_ -> Arg
end),
Inform("Setting disk free limit on ~p to ~p of total RAM", [Node, Frac]),
- rpc_call(Node,
- rabbit_disk_monitor,
- set_disk_free_limit,
+ rpc_call(Node,
+ rabbit_disk_monitor,
+ set_disk_free_limit,
[{mem_relative, Frac}]);
@@ -544,6 +544,28 @@ action(clear_policy, Node, [Key], Opts, Inform) ->
Inform("Clearing policy ~p", [Key]),
rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]);
+action(set_operator_policy, Node, [Key, Pattern, Defn], Opts, Inform) ->
+ Msg = "Setting operator policy override ~p for pattern ~p to ~p with priority ~p",
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts),
+ ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)),
+ Inform(Msg, [Key, Pattern, Defn, PriorityArg]),
+ Res = rpc_call(
+ Node, rabbit_policy, parse_set_op,
+ [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]),
+ case Res of
+ {error, Format, Args} when is_list(Format) andalso is_list(Args) ->
+ {error_string, rabbit_misc:format(Format, Args)};
+ _ ->
+ Res
+ end;
+
+action(clear_operator_policy, Node, [Key], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Clearing operator policy ~p", [Key]),
+ rpc_call(Node, rabbit_policy, delete_op, [VHostArg, list_to_binary(Key)]);
+
+
action(report, Node, _Args, _Opts, Inform) ->
Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
@@ -973,7 +995,7 @@ escape(Bin, IsEscaped) when is_binary(Bin) ->
escape(L, false) when is_list(L) ->
escape_char(lists:reverse(L), []);
escape(L, true) when is_list(L) ->
- L.
+ L.
escape_char([$\\ | T], Acc) ->
escape_char(T, [$\\, $\\ | Acc]);
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index a9caadf972..18d5c44261 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -46,6 +46,8 @@
-export([validate/5, notify/4, notify_clear/3]).
-export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1,
list_formatted/1, list_formatted/3, info_keys/0]).
+-export([parse_set_op/6, set_op/6, delete_op/2, lookup_op/2, list_op/0, list_op/1,
+ list_formatted_op/1, list_formatted_op/3]).
-rabbit_boot_step({?MODULE,
[{description, "policy parameters"},
@@ -65,13 +67,14 @@ name0(Policy) -> pget(name, Policy).
set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
-set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
+set0(Name = #resource{virtual_host = VHost}) ->
+ match(Name, list(VHost), list_op(VHost)).
get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
%% Caution - SLOW.
get(Name, EntityName = #resource{virtual_host = VHost}) ->
- get0(Name, match(EntityName, list(VHost))).
+ get0(Name, match(EntityName, list(VHost), list_op(VHost))).
get0(_Name, undefined) -> undefined;
get0(Name, List) -> case pget(definition, List) of
@@ -112,19 +115,20 @@ recover0() ->
Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}),
Qs = mnesia:dirty_match_object(rabbit_durable_queue, #amqqueue{_ = '_'}),
Policies = list(),
+ OpPolicies = list_op(),
[rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:write(
rabbit_durable_exchange,
rabbit_exchange_decorator:set(
- X#exchange{policy = match(Name, Policies)}), write)
+ X#exchange{policy = match(Name, Policies, OpPolicies)}), write)
end) || X = #exchange{name = Name} <- Xs],
[rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:write(
rabbit_durable_queue,
rabbit_queue_decorator:set(
- Q#amqqueue{policy = match(Name, Policies)}), write)
+ Q#amqqueue{policy = match(Name, Policies, OpPolicies)}), write)
end) || Q = #amqqueue{name = Name} <- Qs],
ok.
@@ -133,17 +137,23 @@ invalid_file() ->
%%----------------------------------------------------------------------------
+parse_set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
+ parse_set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo).
+
parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
+ parse_set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo).
+
+parse_set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
try list_to_integer(Priority) of
- Num -> parse_set0(VHost, Name, Pattern, Definition, Num, ApplyTo)
+ Num -> parse_set0(Type, VHost, Name, Pattern, Definition, Num, ApplyTo)
catch
error:badarg -> {error, "~p priority must be a number", [Priority]}
end.
-parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) ->
+parse_set0(Type, VHost, Name, Pattern, Defn, Priority, ApplyTo) ->
case rabbit_misc:json_decode(Defn) of
{ok, JSON} ->
- set0(VHost, Name,
+ set0(Type, VHost, Name,
[{<<"pattern">>, list_to_binary(Pattern)},
{<<"definition">>, rabbit_misc:json_to_term(JSON)},
{<<"priority">>, Priority},
@@ -152,7 +162,13 @@ parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) ->
{error_string, "JSON decoding error"}
end.
+set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
+ set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo).
+
set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
+ set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo).
+
+set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
PolicyProps = [{<<"pattern">>, Pattern},
{<<"definition">>, Definition},
{<<"priority">>, case Priority of
@@ -163,20 +179,47 @@ set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
undefined -> <<"all">>;
_ -> ApplyTo
end}],
- set0(VHost, Name, PolicyProps).
+ set0(Type, VHost, Name, PolicyProps).
+
+set0(Type, VHost, Name, Term) ->
+ rabbit_runtime_parameters:set_any(VHost, Type, Name, Term, none).
-set0(VHost, Name, Term) ->
- rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term, none).
+delete_op(VHost, Name) ->
+ rabbit_runtime_parameters:clear_any(VHost, <<"operator_policy">>, Name).
delete(VHost, Name) ->
rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name).
+lookup_op(VHost, Name) ->
+ case rabbit_runtime_parameters:lookup(VHost, <<"operator_policy">>, Name) of
+ not_found -> not_found;
+ P -> p(P, fun ident/1)
+ end.
+
lookup(VHost, Name) ->
case rabbit_runtime_parameters:lookup(VHost, <<"policy">>, Name) of
not_found -> not_found;
P -> p(P, fun ident/1)
end.
+list_op() ->
+ list_op('_').
+
+list_op(VHost) ->
+ list0_op(VHost, fun ident/1).
+
+list_formatted_op(VHost) ->
+ order_policies(list0_op(VHost, fun format/1)).
+
+list_formatted_op(VHost, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map(AggregatorPid, Ref,
+ fun(P) -> P end, list_formatted_op(VHost)).
+
+list0_op(VHost, DefnFun) ->
+ [p(P, DefnFun)
+ || P <- rabbit_runtime_parameters:list(VHost, <<"operator_policy">>)].
+
+
list() ->
list('_').
@@ -194,8 +237,7 @@ list0(VHost, DefnFun) ->
[p(P, DefnFun) || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)].
order_policies(PropList) ->
- lists:sort(fun (A, B) -> pget(priority, A) < pget(priority, B) end,
- PropList).
+ lists:sort(fun (A, B) -> not sort_pred(A, B) end, PropList).
p(Parameter, DefnFun) ->
Value = pget(value, Parameter),
@@ -239,26 +281,26 @@ update_policies(VHost) ->
Tabs = [rabbit_queue, rabbit_durable_queue,
rabbit_exchange, rabbit_durable_exchange],
{Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
- fun() ->
- [mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
- case catch list(VHost) of
- {'EXIT', {throw, {error, {no_such_vhost, _}}}} ->
- {[], []}; %% [2]
- {'EXIT', Exit} ->
- exit(Exit);
- Policies ->
- {[update_exchange(X, Policies) ||
- X <- rabbit_exchange:list(VHost)],
- [update_queue(Q, Policies) ||
- Q <- rabbit_amqqueue:list(VHost)]}
- end
- end),
+ fun() ->
+ [mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
+ case catch {list(VHost), list_op(VHost)} of
+ {'EXIT', {throw, {error, {no_such_vhost, _}}}} ->
+ {[], []}; %% [2]
+ {'EXIT', Exit} ->
+ exit(Exit);
+ {Policies, OpPolicies} ->
+ {[update_exchange(X, Policies, OpPolicies) ||
+ X <- rabbit_exchange:list(VHost)],
+ [update_queue(Q, Policies, OpPolicies) ||
+ Q <- rabbit_amqqueue:list(VHost)]}
+ end
+ end),
[catch notify(X) || X <- Xs],
[catch notify(Q) || Q <- Qs],
ok.
-update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
- case match(XName, Policies) of
+update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies, OpPolicies) ->
+ case match(XName, Policies, OpPolicies) of
OldPolicy -> no_change;
NewPolicy -> case rabbit_exchange:update(
XName, fun (X0) ->
@@ -270,8 +312,8 @@ update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
end
end.
-update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
- case match(QName, Policies) of
+update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies, OpPolicies) ->
+ case match(QName, Policies, OpPolicies) of
OldPolicy -> no_change;
NewPolicy -> case rabbit_amqqueue:update(
QName, fun(Q1) ->
@@ -292,12 +334,43 @@ notify({X1 = #exchange{}, X2 = #exchange{}}) ->
notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) ->
rabbit_amqqueue:policy_changed(Q1, Q2).
-match(Name, Policies) ->
- case lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]) of
- [] -> undefined;
- [Policy | _Rest] -> Policy
+match(Name, Policies, OpPolicies) ->
+ MatchedPolicies = match_all(Name, Policies),
+ MatchedOpPolicies = match_all(Name, OpPolicies),
+ case {MatchedPolicies, MatchedOpPolicies} of
+ {[], []} -> undefined;
+ {[Policy | _], []} -> Policy;
+ {[], [OpPolicy | _]} -> OpPolicy;
+ {[Policy | _], [OpPolicy | _]} -> merge_op_policy(Policy, OpPolicy)
+ end.
+
+merge_op_policy(Policy, OpPolicy) ->
+ OpDefinition = pget(definition, OpPolicy),
+ Definition = pget(definition, Policy),
+ ResultDefinition = lists:map(fun(Key) ->
+ case {pget(Key, Definition), pget(Key, OpDefinition)} of
+ {undefined, undefined} -> undefined;
+ {undefined, Val} -> Val;
+ {Val, undefined} -> Val;
+ {Val, OpVal} -> merge_policy_value(Key, Val, OpVal)
+ end
+ end),
+ lists:keyreplace(definition, 1, Policy, {definition, ResultDefinition}).
+
+merge_policy_value(Name, PolicyVal, OpVal) ->
+ case policy_merge_strategy(Name) of
+ undefined -> PolicyVal;
+ Strategy -> Strategy(PolicyVal, OpVal)
end.
+policy_merge_strategy(<<"message-ttl">>) -> fun erlang:min/2;
+policy_merge_strategy(<<"max-length">>) -> fun erlang:min/2;
+policy_merge_strategy(<<"max-length-bytes">>) -> fun erlang:min/2;
+policy_merge_strategy(_) -> undefined.
+
+match_all(Name, Policies) ->
+ lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]).
+
matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) ->
matches_type(Kind, pget('apply-to', Policy)) andalso
match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso