summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2016-09-01 19:21:05 +0400
committerGitHub <noreply@github.com>2016-09-01 19:21:05 +0400
commit6fd6548bb68356cceb50679767446a1ea0f3c55d (patch)
tree339059817e13a6e17d8a4244f4ec664fe9612b3f
parent7b8f649e762c00f792b8472ccb4c059f95ff4fed (diff)
parent26376b8d0b9347f5c62e24b33b53f8e02b516c54 (diff)
downloadrabbitmq-server-git-6fd6548bb68356cceb50679767446a1ea0f3c55d.tar.gz
Merge pull request #938 from rabbitmq/rabbitmq-server-930
Operator policies
-rw-r--r--docs/rabbitmqctl.1.xml25
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_control_main.erl55
-rw-r--r--src/rabbit_policies.erl19
-rw-r--r--src/rabbit_policy.erl283
-rw-r--r--src/rabbit_policy_merge_strategy.erl28
-rw-r--r--src/rabbit_upgrade_functions.erl32
-rw-r--r--test/policy_SUITE.erl157
8 files changed, 528 insertions, 83 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index a2ebc0b817..023e8ec0c5 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1209,6 +1209,31 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>set_operator_policy</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg> <arg choice="opt">--priority <replaceable>priority</replaceable></arg> <arg choice="opt">--apply-to <replaceable>apply-to</replaceable></arg> <arg choice="req"><replaceable>name</replaceable></arg> <arg choice="req"><replaceable>pattern</replaceable></arg> <arg choice="req"><replaceable>definition</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Sets an operator policy that overrides a subset of arguments in user policies. Arguments are identical to those of <command>set_policy</command>.
+ Supported arguments: <command>expires</command>, <command>message-ttl</command>, <command>max-length</command>, and <command>max-length-bytes</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>clear_operator_policy</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg> <arg choice="req"><replaceable>name</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Clears an operator policy. Arguments are identical to those of <command>clear_policy</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>list_operator_policies</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Lists operator policy overrides for a virtual host. Arguments are identical to those of <command>list_policies</command>.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 710ec6ab2a..2c42e348ca 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -107,6 +107,8 @@
-define(STATISTICS_KEYS,
[name,
policy,
+ operator_policy,
+ effective_policy_definition,
exclusive_consumer_pid,
exclusive_consumer_tag,
messages_ready,
@@ -887,6 +889,16 @@ i(policy, #q{q = Q}) ->
none -> '';
Policy -> Policy
end;
+i(operator_policy, #q{q = Q}) ->
+ case rabbit_policy:name_op(Q) of
+ none -> '';
+ Policy -> Policy
+ end;
+i(effective_policy_definition, #q{q = Q}) ->
+ case rabbit_policy:effective_definition(Q) of
+ undefined -> [];
+ Def -> Def
+ end;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 0e99ac0f7e..f48f0349aa 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -72,7 +72,10 @@
{set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]},
{clear_policy, [?VHOST_DEF]},
+ {set_operator_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]},
+ {clear_operator_policy, [?VHOST_DEF]},
{list_policies, [?VHOST_DEF]},
+ {list_operator_policies, [?VHOST_DEF]},
{set_vhost_limits, [?VHOST_DEF]},
{clear_vhost_limits, [?VHOST_DEF]},
@@ -287,14 +290,14 @@ action(start_app, Node, [], _Opts, Inform) ->
action(reset, Node, [], _Opts, Inform) ->
Inform("Resetting node ~p", [Node]),
- require_mnesia_stopped(Node,
+ require_mnesia_stopped(Node,
fun() ->
call(Node, {rabbit_mnesia, reset, []})
end);
action(force_reset, Node, [], _Opts, Inform) ->
Inform("Forcefully resetting node ~p", [Node]),
- require_mnesia_stopped(Node,
+ require_mnesia_stopped(Node,
fun() ->
call(Node, {rabbit_mnesia, force_reset, []})
end);
@@ -306,21 +309,21 @@ action(join_cluster, Node, [ClusterNodeS], Opts, Inform) ->
false -> disc
end,
Inform("Clustering node ~p with ~p", [Node, ClusterNode]),
- require_mnesia_stopped(Node,
+ require_mnesia_stopped(Node,
fun() ->
rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, NodeType])
end);
action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) ->
Inform("Turning ~p into a ram node", [Node]),
- require_mnesia_stopped(Node,
+ require_mnesia_stopped(Node,
fun() ->
rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram])
end);
action(change_cluster_node_type, Node, [Type], _Opts, Inform)
when Type =:= "disc" orelse Type =:= "disk" ->
Inform("Turning ~p into a disc node", [Node]),
- require_mnesia_stopped(Node,
+ require_mnesia_stopped(Node,
fun() ->
rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc])
end);
@@ -328,7 +331,7 @@ action(change_cluster_node_type, Node, [Type], _Opts, Inform)
action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) ->
ClusterNode = list_to_atom(ClusterNodeS),
Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]),
- require_mnesia_stopped(Node,
+ require_mnesia_stopped(Node,
fun() ->
rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode])
end);
@@ -490,9 +493,9 @@ action(set_disk_free_limit, Node, ["mem_relative", Arg], _Opts, Inform) ->
_ -> Arg
end),
Inform("Setting disk free limit on ~p to ~p of total RAM", [Node, Frac]),
- rpc_call(Node,
- rabbit_disk_monitor,
- set_disk_free_limit,
+ rpc_call(Node,
+ rabbit_disk_monitor,
+ set_disk_free_limit,
[{mem_relative, Frac}]);
@@ -546,6 +549,27 @@ action(clear_policy, Node, [Key], Opts, Inform) ->
Inform("Clearing policy ~p", [Key]),
rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]);
+action(set_operator_policy, Node, [Key, Pattern, Defn], Opts, Inform) ->
+ Msg = "Setting operator policy override ~p for pattern ~p to ~p with priority ~p",
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts),
+ ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)),
+ Inform(Msg, [Key, Pattern, Defn, PriorityArg]),
+ Res = rpc_call(
+ Node, rabbit_policy, parse_set_op,
+ [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]),
+ case Res of
+ {error, Format, Args} when is_list(Format) andalso is_list(Args) ->
+ {error_string, rabbit_misc:format(Format, Args)};
+ _ ->
+ Res
+ end;
+
+action(clear_operator_policy, Node, [Key], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Clearing operator policy ~p", [Key]),
+ rpc_call(Node, rabbit_policy, delete_op, [VHostArg, list_to_binary(Key)]);
+
action(set_vhost_limits, Node, [Defn], Opts, Inform) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Setting vhost limits for vhost ~p", [VHostArg]),
@@ -630,6 +654,14 @@ action(list_policies, Node, [], Opts, Inform, Timeout) ->
rabbit_policy:info_keys(),
[{timeout, Timeout}]);
+action(list_operator_policies, Node, [], Opts, Inform, Timeout) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Listing policies", []),
+ call_emitter(Node, {rabbit_policy, list_formatted_op, [VHostArg]},
+ rabbit_policy:info_keys(),
+ [{timeout, Timeout}]);
+
+
action(list_vhosts, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing vhosts", []),
ArgAtoms = default_if_empty(Args, [name]),
@@ -898,6 +930,9 @@ format_info_item([T | _] = Value, IsEscaped)
lists:nthtail(2, lists:append(
[", " ++ format_info_item(E, IsEscaped)
|| E <- Value])) ++ "]";
+format_info_item({Key, Value}, IsEscaped) ->
+ "{" ++ io_lib:format("~p", [Key]) ++ ", " ++
+ format_info_item(Value, IsEscaped) ++ "}";
format_info_item(Value, _IsEscaped) ->
io_lib:format("~w", [Value]).
@@ -986,7 +1021,7 @@ escape(Bin, IsEscaped) when is_binary(Bin) ->
escape(L, false) when is_list(L) ->
escape_char(lists:reverse(L), []);
escape(L, true) when is_list(L) ->
- L.
+ L.
escape_char([$\\ | T], Acc) ->
escape_char(T, [$\\, $\\ | Acc]);
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index c7d4c99f37..22eb3fde18 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -20,10 +20,11 @@
%% validation functions.
-behaviour(rabbit_policy_validator).
+-behaviour(rabbit_policy_merge_strategy).
-include("rabbit.hrl").
--export([register/0, validate_policy/1]).
+-export([register/0, validate_policy/1, merge_policy_value/3]).
-rabbit_boot_step({?MODULE,
[{description, "internal policies"},
@@ -40,7 +41,15 @@ register() ->
{policy_validator, <<"expires">>},
{policy_validator, <<"max-length">>},
{policy_validator, <<"max-length-bytes">>},
- {policy_validator, <<"queue-mode">>}]],
+ {policy_validator, <<"queue-mode">>},
+ {operator_policy_validator, <<"expires">>},
+ {operator_policy_validator, <<"message-ttl">>},
+ {operator_policy_validator, <<"max-length">>},
+ {operator_policy_validator, <<"max-length-bytes">>},
+ {policy_merge_strategy, <<"expires">>},
+ {policy_merge_strategy, <<"message-ttl">>},
+ {policy_merge_strategy, <<"max-length">>},
+ {policy_merge_strategy, <<"max-length-bytes">>}]],
ok.
validate_policy(Terms) ->
@@ -96,3 +105,9 @@ validate_policy0(<<"queue-mode">>, <<"lazy">>) ->
ok;
validate_policy0(<<"queue-mode">>, Value) ->
{error, "~p is not a valid queue-mode value", [Value]}.
+
+merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
+merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);
+merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal);
+merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal).
+
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index a9caadf972..7e39164882 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -38,14 +38,16 @@
-include("rabbit.hrl").
--import(rabbit_misc, [pget/2]).
+-import(rabbit_misc, [pget/2, pget/3]).
-export([register/0]).
-export([invalidate/0, recover/0]).
--export([name/1, get/2, get_arg/3, set/1]).
+-export([name/1, name_op/1, effective_definition/1, get/2, get_arg/3, set/1]).
-export([validate/5, notify/4, notify_clear/3]).
-export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1,
list_formatted/1, list_formatted/3, info_keys/0]).
+-export([parse_set_op/6, set_op/6, delete_op/2, lookup_op/2, list_op/0, list_op/1,
+ list_formatted_op/1, list_formatted_op/3]).
-rabbit_boot_step({?MODULE,
[{description, "policy parameters"},
@@ -54,30 +56,88 @@
{enables, recovery}]}).
register() ->
- rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE).
+ rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE),
+ rabbit_registry:register(runtime_parameter, <<"operator_policy">>, ?MODULE).
name(#amqqueue{policy = Policy}) -> name0(Policy);
name(#exchange{policy = Policy}) -> name0(Policy).
+name_op(#amqqueue{operator_policy = Policy}) -> name0(Policy);
+name_op(#exchange{operator_policy = Policy}) -> name0(Policy).
+
name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
-set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
-set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
-
-set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
+effective_definition(#amqqueue{policy = Policy, operator_policy = OpPolicy}) ->
+ effective_definition0(Policy, OpPolicy);
+effective_definition(#exchange{policy = Policy, operator_policy = OpPolicy}) ->
+ effective_definition0(Policy, OpPolicy).
+
+effective_definition0(undefined, undefined) -> undefined;
+effective_definition0(Policy, undefined) -> pget(definition, Policy);
+effective_definition0(undefined, OpPolicy) -> pget(definition, OpPolicy);
+effective_definition0(Policy, OpPolicy) ->
+ OpDefinition = pget(definition, OpPolicy, []),
+ Definition = pget(definition, Policy, []),
+ {Keys, _} = lists:unzip(Definition),
+ {OpKeys, _} = lists:unzip(OpDefinition),
+ lists:map(fun(Key) ->
+ case {pget(Key, Definition), pget(Key, OpDefinition)} of
+ {Val, undefined} -> {Key, Val};
+ {undefined, Val} -> {Key, Val};
+ {Val, OpVal} -> {Key, merge_policy_value(Key, Val, OpVal)}
+ end
+ end,
+ lists:umerge(Keys, OpKeys)).
+
+set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = match(Name),
+ operator_policy = match_op(Name)};
+set(X = #exchange{name = Name}) -> X#exchange{policy = match(Name),
+ operator_policy = match_op(Name)}.
+
+match(Name = #resource{virtual_host = VHost}) ->
+ match(Name, list(VHost)).
+
+match_op(Name = #resource{virtual_host = VHost}) ->
+ match(Name, list_op(VHost)).
+
+get(Name, #amqqueue{policy = Policy, operator_policy = OpPolicy}) ->
+ get0(Name, Policy, OpPolicy);
+get(Name, #exchange{policy = Policy, operator_policy = OpPolicy}) ->
+ get0(Name, Policy, OpPolicy);
-get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
-get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
%% Caution - SLOW.
get(Name, EntityName = #resource{virtual_host = VHost}) ->
- get0(Name, match(EntityName, list(VHost))).
+ get0(Name,
+ match(EntityName, list(VHost)),
+ match(EntityName, list_op(VHost))).
+
+get0(_Name, undefined, undefined) -> undefined;
+get0(Name, undefined, OpPolicy) -> pget(Name, pget(definition, OpPolicy, []));
+get0(Name, Policy, undefined) -> pget(Name, pget(definition, Policy, []));
+get0(Name, Policy, OpPolicy) ->
+ OpDefinition = pget(definition, OpPolicy, []),
+ Definition = pget(definition, Policy, []),
+ case {pget(Name, Definition), pget(Name, OpDefinition)} of
+ {undefined, undefined} -> undefined;
+ {Val, undefined} -> Val;
+ {undefined, Val} -> Val;
+ {Val, OpVal} -> merge_policy_value(Name, Val, OpVal)
+ end.
+
+merge_policy_value(Name, PolicyVal, OpVal) ->
+ case policy_merge_strategy(Name) of
+ {ok, Module} -> Module:merge_policy_value(Name, PolicyVal, OpVal);
+ {error, not_found} -> PolicyVal
+ end.
-get0(_Name, undefined) -> undefined;
-get0(Name, List) -> case pget(definition, List) of
- undefined -> undefined;
- Policy -> pget(Name, Policy)
- end.
+policy_merge_strategy(Name) ->
+ case rabbit_registry:binary_to_type(Name) of
+ {error, not_found} ->
+ {error, not_found};
+ T ->
+ rabbit_registry:lookup_module(policy_merge_strategy, T)
+ end.
%% Many heads for optimisation
get_arg(_AName, _PName, #exchange{arguments = [], policy = undefined}) ->
@@ -112,19 +172,24 @@ recover0() ->
Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}),
Qs = mnesia:dirty_match_object(rabbit_durable_queue, #amqqueue{_ = '_'}),
Policies = list(),
+ OpPolicies = list_op(),
[rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:write(
rabbit_durable_exchange,
rabbit_exchange_decorator:set(
- X#exchange{policy = match(Name, Policies)}), write)
+ X#exchange{policy = match(Name, Policies),
+ operator_policy = match(Name, OpPolicies)}),
+ write)
end) || X = #exchange{name = Name} <- Xs],
[rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:write(
rabbit_durable_queue,
rabbit_queue_decorator:set(
- Q#amqqueue{policy = match(Name, Policies)}), write)
+ Q#amqqueue{policy = match(Name, Policies),
+ operator_policy = match(Name, OpPolicies)}),
+ write)
end) || Q = #amqqueue{name = Name} <- Qs],
ok.
@@ -133,17 +198,23 @@ invalid_file() ->
%%----------------------------------------------------------------------------
+parse_set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
+ parse_set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo).
+
parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
+ parse_set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo).
+
+parse_set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
try list_to_integer(Priority) of
- Num -> parse_set0(VHost, Name, Pattern, Definition, Num, ApplyTo)
+ Num -> parse_set0(Type, VHost, Name, Pattern, Definition, Num, ApplyTo)
catch
error:badarg -> {error, "~p priority must be a number", [Priority]}
end.
-parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) ->
+parse_set0(Type, VHost, Name, Pattern, Defn, Priority, ApplyTo) ->
case rabbit_misc:json_decode(Defn) of
{ok, JSON} ->
- set0(VHost, Name,
+ set0(Type, VHost, Name,
[{<<"pattern">>, list_to_binary(Pattern)},
{<<"definition">>, rabbit_misc:json_to_term(JSON)},
{<<"priority">>, Priority},
@@ -152,7 +223,13 @@ parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) ->
{error_string, "JSON decoding error"}
end.
+set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
+ set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo).
+
set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
+ set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo).
+
+set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
PolicyProps = [{<<"pattern">>, Pattern},
{<<"definition">>, Definition},
{<<"priority">>, case Priority of
@@ -163,20 +240,47 @@ set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
undefined -> <<"all">>;
_ -> ApplyTo
end}],
- set0(VHost, Name, PolicyProps).
+ set0(Type, VHost, Name, PolicyProps).
-set0(VHost, Name, Term) ->
- rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term, none).
+set0(Type, VHost, Name, Term) ->
+ rabbit_runtime_parameters:set_any(VHost, Type, Name, Term, none).
+
+delete_op(VHost, Name) ->
+ rabbit_runtime_parameters:clear_any(VHost, <<"operator_policy">>, Name).
delete(VHost, Name) ->
rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name).
+lookup_op(VHost, Name) ->
+ case rabbit_runtime_parameters:lookup(VHost, <<"operator_policy">>, Name) of
+ not_found -> not_found;
+ P -> p(P, fun ident/1)
+ end.
+
lookup(VHost, Name) ->
case rabbit_runtime_parameters:lookup(VHost, <<"policy">>, Name) of
not_found -> not_found;
P -> p(P, fun ident/1)
end.
+list_op() ->
+ list_op('_').
+
+list_op(VHost) ->
+ list0_op(VHost, fun ident/1).
+
+list_formatted_op(VHost) ->
+ order_policies(list0_op(VHost, fun format/1)).
+
+list_formatted_op(VHost, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map(AggregatorPid, Ref,
+ fun(P) -> P end, list_formatted_op(VHost)).
+
+list0_op(VHost, DefnFun) ->
+ [p(P, DefnFun)
+ || P <- rabbit_runtime_parameters:list(VHost, <<"operator_policy">>)].
+
+
list() ->
list('_').
@@ -194,8 +298,7 @@ list0(VHost, DefnFun) ->
[p(P, DefnFun) || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)].
order_policies(PropList) ->
- lists:sort(fun (A, B) -> pget(priority, A) < pget(priority, B) end,
- PropList).
+ lists:sort(fun (A, B) -> not sort_pred(A, B) end, PropList).
p(Parameter, DefnFun) ->
Value = pget(value, Parameter),
@@ -218,14 +321,23 @@ info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority].
validate(_VHost, <<"policy">>, Name, Term, _User) ->
rabbit_parameter_validation:proplist(
- Name, policy_validation(), Term).
+ Name, policy_validation(), Term);
+validate(_VHost, <<"operator_policy">>, Name, Term, _User) ->
+ rabbit_parameter_validation:proplist(
+ Name, operator_policy_validation(), Term).
notify(VHost, <<"policy">>, Name, Term) ->
rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]),
+ update_policies(VHost);
+notify(VHost, <<"operator_policy">>, Name, Term) ->
+ rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]),
update_policies(VHost).
notify_clear(VHost, <<"policy">>, Name) ->
rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}]),
+ update_policies(VHost);
+notify_clear(VHost, <<"operator_policy">>, Name) ->
+ rabbit_event:notify(operator_policy_cleared, [{name, Name}, {vhost, VHost}]),
update_policies(VHost).
%%----------------------------------------------------------------------------
@@ -239,50 +351,64 @@ update_policies(VHost) ->
Tabs = [rabbit_queue, rabbit_durable_queue,
rabbit_exchange, rabbit_durable_exchange],
{Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
- fun() ->
- [mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
- case catch list(VHost) of
- {'EXIT', {throw, {error, {no_such_vhost, _}}}} ->
- {[], []}; %% [2]
- {'EXIT', Exit} ->
- exit(Exit);
- Policies ->
- {[update_exchange(X, Policies) ||
- X <- rabbit_exchange:list(VHost)],
- [update_queue(Q, Policies) ||
- Q <- rabbit_amqqueue:list(VHost)]}
- end
- end),
+ fun() ->
+ [mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
+ case catch {list(VHost), list_op(VHost)} of
+ {'EXIT', {throw, {error, {no_such_vhost, _}}}} ->
+ {[], []}; %% [2]
+ {'EXIT', Exit} ->
+ exit(Exit);
+ {Policies, OpPolicies} ->
+ {[update_exchange(X, Policies, OpPolicies) ||
+ X <- rabbit_exchange:list(VHost)],
+ [update_queue(Q, Policies, OpPolicies) ||
+ Q <- rabbit_amqqueue:list(VHost)]}
+ end
+ end),
[catch notify(X) || X <- Xs],
[catch notify(Q) || Q <- Qs],
ok.
-update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
- case match(XName, Policies) of
- OldPolicy -> no_change;
- NewPolicy -> case rabbit_exchange:update(
- XName, fun (X0) ->
- rabbit_exchange_decorator:set(
- X0 #exchange{policy = NewPolicy})
- end) of
- #exchange{} = X1 -> {X, X1};
- not_found -> {X, X }
- end
+update_exchange(X = #exchange{name = XName,
+ policy = OldPolicy,
+ operator_policy = OldOpPolicy},
+ Policies, OpPolicies) ->
+ case {match(XName, Policies), match(XName, OpPolicies)} of
+ {OldPolicy, OldOpPolicy} -> no_change;
+ {NewPolicy, NewOpPolicy} ->
+ NewExchange = rabbit_exchange:update(
+ XName,
+ fun(X0) ->
+ rabbit_exchange_decorator:set(
+ X0 #exchange{policy = NewPolicy,
+ operator_policy = NewOpPolicy})
+ end),
+ case NewExchange of
+ #exchange{} = X1 -> {X, X1};
+ not_found -> {X, X }
+ end
end.
-update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
- case match(QName, Policies) of
- OldPolicy -> no_change;
- NewPolicy -> case rabbit_amqqueue:update(
- QName, fun(Q1) ->
- rabbit_queue_decorator:set(
- Q1#amqqueue{policy = NewPolicy,
- policy_version =
- Q1#amqqueue.policy_version + 1 })
- end) of
- #amqqueue{} = Q1 -> {Q, Q1};
- not_found -> {Q, Q }
- end
+update_queue(Q = #amqqueue{name = QName,
+ policy = OldPolicy,
+ operator_policy = OldOpPolicy},
+ Policies, OpPolicies) ->
+ case {match(QName, Policies), match(QName, OpPolicies)} of
+ {OldPolicy, OldOpPolicy} -> no_change;
+ {NewPolicy, NewOpPolicy} ->
+ NewQueue = rabbit_amqqueue:update(
+ QName,
+ fun(Q1) ->
+ rabbit_queue_decorator:set(
+ Q1#amqqueue{policy = NewPolicy,
+ operator_policy = NewOpPolicy,
+ policy_version =
+ Q1#amqqueue.policy_version + 1 })
+ end),
+ case NewQueue of
+ #amqqueue{} = Q1 -> {Q, Q1};
+ not_found -> {Q, Q }
+ end
end.
notify(no_change)->
@@ -293,11 +419,14 @@ notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) ->
rabbit_amqqueue:policy_changed(Q1, Q2).
match(Name, Policies) ->
- case lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]) of
- [] -> undefined;
- [Policy | _Rest] -> Policy
+ case match_all(Name, Policies) of
+ [] -> undefined;
+ [Policy | _] -> Policy
end.
+match_all(Name, Policies) ->
+ lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]).
+
matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) ->
matches_type(Kind, pget('apply-to', Policy)) andalso
match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso
@@ -313,17 +442,29 @@ sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
%%----------------------------------------------------------------------------
+operator_policy_validation() ->
+ [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory},
+ {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
+ {<<"apply-to">>, fun apply_to_validation/2, optional},
+ {<<"definition">>, fun validation_op/2, mandatory}].
+
policy_validation() ->
[{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory},
{<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
{<<"apply-to">>, fun apply_to_validation/2, optional},
{<<"definition">>, fun validation/2, mandatory}].
-validation(_Name, []) ->
+validation_op(Name, Terms) ->
+ validation(Name, Terms, operator_policy_validator).
+
+validation(Name, Terms) ->
+ validation(Name, Terms, policy_validator).
+
+validation(_Name, [], _Validator) ->
{error, "no policy provided", []};
-validation(_Name, Terms) when is_list(Terms) ->
+validation(_Name, Terms, Validator) when is_list(Terms) ->
{Keys, Modules} = lists:unzip(
- rabbit_registry:lookup_all(policy_validator)),
+ rabbit_registry:lookup_all(Validator)),
[] = dups(Keys), %% ASSERTION
Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys),
case is_proplist(Terms) of
@@ -334,7 +475,7 @@ validation(_Name, Terms) when is_list(Terms) ->
end;
false -> {error, "definition must be a dictionary: ~p", [Terms]}
end;
-validation(_Name, Term) ->
+validation(_Name, Term, _Validator) ->
{error, "parse error while reading policy: ~p", [Term]}.
validation0(Validators, Terms) ->
diff --git a/src/rabbit_policy_merge_strategy.erl b/src/rabbit_policy_merge_strategy.erl
new file mode 100644
index 0000000000..55ad87ccac
--- /dev/null
+++ b/src/rabbit_policy_merge_strategy.erl
@@ -0,0 +1,28 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_policy_merge_strategy).
+
+-behaviour(rabbit_registry_class).
+
+-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]).
+
+-callback merge_policy_value(binary(), Value, Value) ->
+ Value
+ when Value :: term().
+
+added_to_rabbit_registry(_Type, _ModuleName) -> ok.
+removed_from_rabbit_registry(_Type) -> ok.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 3c6f2a4f1f..1b78173ade 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -55,6 +55,7 @@
-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}).
-rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
+-rabbit_upgrade({operator_policies, mnesia, [slave_pids_pending_shutdown, internal_system_x]}).
-rabbit_upgrade({vhost_limits, mnesia, []}).
%% -------------------------------------------------------------------
@@ -495,6 +496,37 @@ slave_pids_pending_shutdown(Table) ->
sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state,
policy_version, slave_pids_pending_shutdown]).
+operator_policies() ->
+ ok = exchange_operator_policies(rabbit_exchange),
+ ok = exchange_operator_policies(rabbit_durable_exchange),
+ ok = queue_operator_policies(rabbit_queue),
+ ok = queue_operator_policies(rabbit_durable_queue).
+
+exchange_operator_policies(Table) ->
+ transform(
+ Table,
+ fun ({exchange, Name, Type, Dur, AutoDel, Internal,
+ Args, Scratches, Policy, Decorators}) ->
+ {exchange, Name, Type, Dur, AutoDel, Internal,
+ Args, Scratches, Policy, undefined, Decorators}
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches, policy,
+ operator_policy, decorators]).
+
+queue_operator_policies(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State, PolicyVersion, SlavePidsPendingShutdown}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, undefined, GmPids,
+ Decorators, State, PolicyVersion, SlavePidsPendingShutdown}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, operator_policy,
+ gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown]).
+
%% Prior to 3.6.0, passwords were hashed using MD5, this populates
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal
diff --git a/test/policy_SUITE.erl b/test/policy_SUITE.erl
new file mode 100644
index 0000000000..0920b25418
--- /dev/null
+++ b/test/policy_SUITE.erl
@@ -0,0 +1,157 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(policy_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, cluster_size_2}
+ ].
+
+groups() ->
+ [
+ {cluster_size_2, [], [
+ policy_ttl,
+ operator_policy_ttl
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_2, Config) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 2},
+ {rmq_nodename_suffix, Suffix}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_client_helpers:setup_steps(),
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_client_helpers:teardown_steps(),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Test cases.
+%% -------------------------------------------------------------------
+
+policy_ttl(Config) ->
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Q = <<"policy_ttl-queue">>,
+ rabbit_ct_broker_helpers:set_policy(Config, 0, <<"ttl-policy">>,
+ <<"policy_ttl-queue">>, <<"all">>, [{<<"message-ttl">>, 20}]),
+
+ declare(Ch, Q),
+ publish(Ch, Q, lists:seq(1, 20)),
+ timer:sleep(50),
+ get_empty(Ch, Q),
+ delete(Ch, Q),
+
+ rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl-policy">>),
+
+ rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
+ passed.
+
+operator_policy_ttl(Config) ->
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Q = <<"policy_ttl-queue">>,
+ % Operator policy will override
+ rabbit_ct_broker_helpers:set_policy(Config, 0, <<"ttl-policy">>,
+ <<"policy_ttl-queue">>, <<"all">>, [{<<"message-ttl">>, 100000}]),
+ rabbit_ct_broker_helpers:set_operator_policy(Config, 0, <<"ttl-policy-op">>,
+ <<"policy_ttl-queue">>, <<"all">>, [{<<"message-ttl">>, 1}]),
+
+ declare(Ch, Q),
+ publish(Ch, Q, lists:seq(1, 50)),
+ timer:sleep(50),
+ get_empty(Ch, Q),
+ delete(Ch, Q),
+
+ rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl-policy">>),
+ rabbit_ct_broker_helpers:clear_operator_policy(Config, 0, <<"ttl-policy-op">>),
+
+ rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
+ passed.
+
+%%----------------------------------------------------------------------------
+
+
+declare(Ch, Q) ->
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q,
+ durable = true}).
+
+delete(Ch, Q) ->
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
+
+publish(Ch, Q, Ps) ->
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ [publish1(Ch, Q, P) || P <- Ps],
+ amqp_channel:wait_for_confirms(Ch).
+
+publish1(Ch, Q, P) ->
+ amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q},
+ #amqp_msg{props = props(P),
+ payload = erlang:md5(P)}).
+
+publish1(Ch, Q, P, Pd) ->
+ amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q},
+ #amqp_msg{props = props(P),
+ payload = Pd}).
+
+props(undefined) -> #'P_basic'{delivery_mode = 2};
+props(P) -> #'P_basic'{priority = P,
+ delivery_mode = 2}.
+
+consume(Ch, Q, Ack) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
+ no_ack = Ack =:= no_ack,
+ consumer_tag = <<"ctag">>},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end.
+
+get_empty(Ch, Q) ->
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = Q}).
+
+%%----------------------------------------------------------------------------