diff options
| author | Michael Klishin <michael@novemberain.com> | 2016-09-01 19:21:05 +0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2016-09-01 19:21:05 +0400 |
| commit | 6fd6548bb68356cceb50679767446a1ea0f3c55d (patch) | |
| tree | 339059817e13a6e17d8a4244f4ec664fe9612b3f | |
| parent | 7b8f649e762c00f792b8472ccb4c059f95ff4fed (diff) | |
| parent | 26376b8d0b9347f5c62e24b33b53f8e02b516c54 (diff) | |
| download | rabbitmq-server-git-6fd6548bb68356cceb50679767446a1ea0f3c55d.tar.gz | |
Merge pull request #938 from rabbitmq/rabbitmq-server-930
Operator policies
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 25 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 55 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 283 | ||||
| -rw-r--r-- | src/rabbit_policy_merge_strategy.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 32 | ||||
| -rw-r--r-- | test/policy_SUITE.erl | 157 |
8 files changed, 528 insertions, 83 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index a2ebc0b817..023e8ec0c5 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1209,6 +1209,31 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>set_operator_policy</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg> <arg choice="opt">--priority <replaceable>priority</replaceable></arg> <arg choice="opt">--apply-to <replaceable>apply-to</replaceable></arg> <arg choice="req"><replaceable>name</replaceable></arg> <arg choice="req"><replaceable>pattern</replaceable></arg> <arg choice="req"><replaceable>definition</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Sets an operator policy that overrides a subset of arguments in user policies. Arguments are identical to those of <command>set_policy</command>. + Supported arguments: <command>expires</command>, <command>message-ttl</command>, <command>max-length</command>, and <command>max-length-bytes</command>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>clear_operator_policy</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg> <arg choice="req"><replaceable>name</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Clears an operator policy. Arguments are identical to those of <command>clear_policy</command>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>list_operator_policies</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Lists operator policy overrides for a virtual host. Arguments are identical to those of <command>list_policies</command>. + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 710ec6ab2a..2c42e348ca 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -107,6 +107,8 @@ -define(STATISTICS_KEYS, [name, policy, + operator_policy, + effective_policy_definition, exclusive_consumer_pid, exclusive_consumer_tag, messages_ready, @@ -887,6 +889,16 @@ i(policy, #q{q = Q}) -> none -> ''; Policy -> Policy end; +i(operator_policy, #q{q = Q}) -> + case rabbit_policy:name_op(Q) of + none -> ''; + Policy -> Policy + end; +i(effective_policy_definition, #q{q = Q}) -> + case rabbit_policy:effective_definition(Q) of + undefined -> []; + Def -> Def + end; i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) -> diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 0e99ac0f7e..f48f0349aa 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -72,7 +72,10 @@ {set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]}, {clear_policy, [?VHOST_DEF]}, + {set_operator_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]}, + {clear_operator_policy, [?VHOST_DEF]}, {list_policies, [?VHOST_DEF]}, + {list_operator_policies, [?VHOST_DEF]}, {set_vhost_limits, [?VHOST_DEF]}, {clear_vhost_limits, [?VHOST_DEF]}, @@ -287,14 +290,14 @@ action(start_app, Node, [], _Opts, Inform) -> action(reset, Node, [], _Opts, Inform) -> Inform("Resetting node ~p", [Node]), - require_mnesia_stopped(Node, + require_mnesia_stopped(Node, fun() -> call(Node, {rabbit_mnesia, reset, []}) end); action(force_reset, Node, [], _Opts, Inform) -> Inform("Forcefully resetting node ~p", [Node]), - require_mnesia_stopped(Node, + require_mnesia_stopped(Node, fun() -> call(Node, {rabbit_mnesia, force_reset, []}) end); @@ -306,21 +309,21 @@ action(join_cluster, Node, [ClusterNodeS], Opts, Inform) -> false -> disc end, Inform("Clustering node ~p with ~p", [Node, ClusterNode]), - require_mnesia_stopped(Node, + require_mnesia_stopped(Node, fun() -> rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, NodeType]) end); action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) -> Inform("Turning ~p into a ram node", [Node]), - require_mnesia_stopped(Node, + require_mnesia_stopped(Node, fun() -> rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram]) end); action(change_cluster_node_type, Node, [Type], _Opts, Inform) when Type =:= "disc" orelse Type =:= "disk" -> Inform("Turning ~p into a disc node", [Node]), - require_mnesia_stopped(Node, + require_mnesia_stopped(Node, fun() -> rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc]) end); @@ -328,7 +331,7 @@ action(change_cluster_node_type, Node, [Type], _Opts, Inform) action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) -> ClusterNode = list_to_atom(ClusterNodeS), Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]), - require_mnesia_stopped(Node, + require_mnesia_stopped(Node, fun() -> rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode]) end); @@ -490,9 +493,9 @@ action(set_disk_free_limit, Node, ["mem_relative", Arg], _Opts, Inform) -> _ -> Arg end), Inform("Setting disk free limit on ~p to ~p of total RAM", [Node, Frac]), - rpc_call(Node, - rabbit_disk_monitor, - set_disk_free_limit, + rpc_call(Node, + rabbit_disk_monitor, + set_disk_free_limit, [{mem_relative, Frac}]); @@ -546,6 +549,27 @@ action(clear_policy, Node, [Key], Opts, Inform) -> Inform("Clearing policy ~p", [Key]), rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]); +action(set_operator_policy, Node, [Key, Pattern, Defn], Opts, Inform) -> + Msg = "Setting operator policy override ~p for pattern ~p to ~p with priority ~p", + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts), + ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)), + Inform(Msg, [Key, Pattern, Defn, PriorityArg]), + Res = rpc_call( + Node, rabbit_policy, parse_set_op, + [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]), + case Res of + {error, Format, Args} when is_list(Format) andalso is_list(Args) -> + {error_string, rabbit_misc:format(Format, Args)}; + _ -> + Res + end; + +action(clear_operator_policy, Node, [Key], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Clearing operator policy ~p", [Key]), + rpc_call(Node, rabbit_policy, delete_op, [VHostArg, list_to_binary(Key)]); + action(set_vhost_limits, Node, [Defn], Opts, Inform) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Setting vhost limits for vhost ~p", [VHostArg]), @@ -630,6 +654,14 @@ action(list_policies, Node, [], Opts, Inform, Timeout) -> rabbit_policy:info_keys(), [{timeout, Timeout}]); +action(list_operator_policies, Node, [], Opts, Inform, Timeout) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Listing policies", []), + call_emitter(Node, {rabbit_policy, list_formatted_op, [VHostArg]}, + rabbit_policy:info_keys(), + [{timeout, Timeout}]); + + action(list_vhosts, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing vhosts", []), ArgAtoms = default_if_empty(Args, [name]), @@ -898,6 +930,9 @@ format_info_item([T | _] = Value, IsEscaped) lists:nthtail(2, lists:append( [", " ++ format_info_item(E, IsEscaped) || E <- Value])) ++ "]"; +format_info_item({Key, Value}, IsEscaped) -> + "{" ++ io_lib:format("~p", [Key]) ++ ", " ++ + format_info_item(Value, IsEscaped) ++ "}"; format_info_item(Value, _IsEscaped) -> io_lib:format("~w", [Value]). @@ -986,7 +1021,7 @@ escape(Bin, IsEscaped) when is_binary(Bin) -> escape(L, false) when is_list(L) -> escape_char(lists:reverse(L), []); escape(L, true) when is_list(L) -> - L. + L. escape_char([$\\ | T], Acc) -> escape_char(T, [$\\, $\\ | Acc]); diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index c7d4c99f37..22eb3fde18 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -20,10 +20,11 @@ %% validation functions. -behaviour(rabbit_policy_validator). +-behaviour(rabbit_policy_merge_strategy). -include("rabbit.hrl"). --export([register/0, validate_policy/1]). +-export([register/0, validate_policy/1, merge_policy_value/3]). -rabbit_boot_step({?MODULE, [{description, "internal policies"}, @@ -40,7 +41,15 @@ register() -> {policy_validator, <<"expires">>}, {policy_validator, <<"max-length">>}, {policy_validator, <<"max-length-bytes">>}, - {policy_validator, <<"queue-mode">>}]], + {policy_validator, <<"queue-mode">>}, + {operator_policy_validator, <<"expires">>}, + {operator_policy_validator, <<"message-ttl">>}, + {operator_policy_validator, <<"max-length">>}, + {operator_policy_validator, <<"max-length-bytes">>}, + {policy_merge_strategy, <<"expires">>}, + {policy_merge_strategy, <<"message-ttl">>}, + {policy_merge_strategy, <<"max-length">>}, + {policy_merge_strategy, <<"max-length-bytes">>}]], ok. validate_policy(Terms) -> @@ -96,3 +105,9 @@ validate_policy0(<<"queue-mode">>, <<"lazy">>) -> ok; validate_policy0(<<"queue-mode">>, Value) -> {error, "~p is not a valid queue-mode value", [Value]}. + +merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal); +merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal); +merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal); +merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal). + diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index a9caadf972..7e39164882 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -38,14 +38,16 @@ -include("rabbit.hrl"). --import(rabbit_misc, [pget/2]). +-import(rabbit_misc, [pget/2, pget/3]). -export([register/0]). -export([invalidate/0, recover/0]). --export([name/1, get/2, get_arg/3, set/1]). +-export([name/1, name_op/1, effective_definition/1, get/2, get_arg/3, set/1]). -export([validate/5, notify/4, notify_clear/3]). -export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1, list_formatted/1, list_formatted/3, info_keys/0]). +-export([parse_set_op/6, set_op/6, delete_op/2, lookup_op/2, list_op/0, list_op/1, + list_formatted_op/1, list_formatted_op/3]). -rabbit_boot_step({?MODULE, [{description, "policy parameters"}, @@ -54,30 +56,88 @@ {enables, recovery}]}). register() -> - rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE). + rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE), + rabbit_registry:register(runtime_parameter, <<"operator_policy">>, ?MODULE). name(#amqqueue{policy = Policy}) -> name0(Policy); name(#exchange{policy = Policy}) -> name0(Policy). +name_op(#amqqueue{operator_policy = Policy}) -> name0(Policy); +name_op(#exchange{operator_policy = Policy}) -> name0(Policy). + name0(undefined) -> none; name0(Policy) -> pget(name, Policy). -set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; -set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. - -set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)). +effective_definition(#amqqueue{policy = Policy, operator_policy = OpPolicy}) -> + effective_definition0(Policy, OpPolicy); +effective_definition(#exchange{policy = Policy, operator_policy = OpPolicy}) -> + effective_definition0(Policy, OpPolicy). + +effective_definition0(undefined, undefined) -> undefined; +effective_definition0(Policy, undefined) -> pget(definition, Policy); +effective_definition0(undefined, OpPolicy) -> pget(definition, OpPolicy); +effective_definition0(Policy, OpPolicy) -> + OpDefinition = pget(definition, OpPolicy, []), + Definition = pget(definition, Policy, []), + {Keys, _} = lists:unzip(Definition), + {OpKeys, _} = lists:unzip(OpDefinition), + lists:map(fun(Key) -> + case {pget(Key, Definition), pget(Key, OpDefinition)} of + {Val, undefined} -> {Key, Val}; + {undefined, Val} -> {Key, Val}; + {Val, OpVal} -> {Key, merge_policy_value(Key, Val, OpVal)} + end + end, + lists:umerge(Keys, OpKeys)). + +set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = match(Name), + operator_policy = match_op(Name)}; +set(X = #exchange{name = Name}) -> X#exchange{policy = match(Name), + operator_policy = match_op(Name)}. + +match(Name = #resource{virtual_host = VHost}) -> + match(Name, list(VHost)). + +match_op(Name = #resource{virtual_host = VHost}) -> + match(Name, list_op(VHost)). + +get(Name, #amqqueue{policy = Policy, operator_policy = OpPolicy}) -> + get0(Name, Policy, OpPolicy); +get(Name, #exchange{policy = Policy, operator_policy = OpPolicy}) -> + get0(Name, Policy, OpPolicy); -get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy); -get(Name, #exchange{policy = Policy}) -> get0(Name, Policy); %% Caution - SLOW. get(Name, EntityName = #resource{virtual_host = VHost}) -> - get0(Name, match(EntityName, list(VHost))). + get0(Name, + match(EntityName, list(VHost)), + match(EntityName, list_op(VHost))). + +get0(_Name, undefined, undefined) -> undefined; +get0(Name, undefined, OpPolicy) -> pget(Name, pget(definition, OpPolicy, [])); +get0(Name, Policy, undefined) -> pget(Name, pget(definition, Policy, [])); +get0(Name, Policy, OpPolicy) -> + OpDefinition = pget(definition, OpPolicy, []), + Definition = pget(definition, Policy, []), + case {pget(Name, Definition), pget(Name, OpDefinition)} of + {undefined, undefined} -> undefined; + {Val, undefined} -> Val; + {undefined, Val} -> Val; + {Val, OpVal} -> merge_policy_value(Name, Val, OpVal) + end. + +merge_policy_value(Name, PolicyVal, OpVal) -> + case policy_merge_strategy(Name) of + {ok, Module} -> Module:merge_policy_value(Name, PolicyVal, OpVal); + {error, not_found} -> PolicyVal + end. -get0(_Name, undefined) -> undefined; -get0(Name, List) -> case pget(definition, List) of - undefined -> undefined; - Policy -> pget(Name, Policy) - end. +policy_merge_strategy(Name) -> + case rabbit_registry:binary_to_type(Name) of + {error, not_found} -> + {error, not_found}; + T -> + rabbit_registry:lookup_module(policy_merge_strategy, T) + end. %% Many heads for optimisation get_arg(_AName, _PName, #exchange{arguments = [], policy = undefined}) -> @@ -112,19 +172,24 @@ recover0() -> Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}), Qs = mnesia:dirty_match_object(rabbit_durable_queue, #amqqueue{_ = '_'}), Policies = list(), + OpPolicies = list_op(), [rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:write( rabbit_durable_exchange, rabbit_exchange_decorator:set( - X#exchange{policy = match(Name, Policies)}), write) + X#exchange{policy = match(Name, Policies), + operator_policy = match(Name, OpPolicies)}), + write) end) || X = #exchange{name = Name} <- Xs], [rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:write( rabbit_durable_queue, rabbit_queue_decorator:set( - Q#amqqueue{policy = match(Name, Policies)}), write) + Q#amqqueue{policy = match(Name, Policies), + operator_policy = match(Name, OpPolicies)}), + write) end) || Q = #amqqueue{name = Name} <- Qs], ok. @@ -133,17 +198,23 @@ invalid_file() -> %%---------------------------------------------------------------------------- +parse_set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> + parse_set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo). + parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> + parse_set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo). + +parse_set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) -> try list_to_integer(Priority) of - Num -> parse_set0(VHost, Name, Pattern, Definition, Num, ApplyTo) + Num -> parse_set0(Type, VHost, Name, Pattern, Definition, Num, ApplyTo) catch error:badarg -> {error, "~p priority must be a number", [Priority]} end. -parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) -> +parse_set0(Type, VHost, Name, Pattern, Defn, Priority, ApplyTo) -> case rabbit_misc:json_decode(Defn) of {ok, JSON} -> - set0(VHost, Name, + set0(Type, VHost, Name, [{<<"pattern">>, list_to_binary(Pattern)}, {<<"definition">>, rabbit_misc:json_to_term(JSON)}, {<<"priority">>, Priority}, @@ -152,7 +223,13 @@ parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) -> {error_string, "JSON decoding error"} end. +set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> + set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo). + set(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> + set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo). + +set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) -> PolicyProps = [{<<"pattern">>, Pattern}, {<<"definition">>, Definition}, {<<"priority">>, case Priority of @@ -163,20 +240,47 @@ set(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> undefined -> <<"all">>; _ -> ApplyTo end}], - set0(VHost, Name, PolicyProps). + set0(Type, VHost, Name, PolicyProps). -set0(VHost, Name, Term) -> - rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term, none). +set0(Type, VHost, Name, Term) -> + rabbit_runtime_parameters:set_any(VHost, Type, Name, Term, none). + +delete_op(VHost, Name) -> + rabbit_runtime_parameters:clear_any(VHost, <<"operator_policy">>, Name). delete(VHost, Name) -> rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name). +lookup_op(VHost, Name) -> + case rabbit_runtime_parameters:lookup(VHost, <<"operator_policy">>, Name) of + not_found -> not_found; + P -> p(P, fun ident/1) + end. + lookup(VHost, Name) -> case rabbit_runtime_parameters:lookup(VHost, <<"policy">>, Name) of not_found -> not_found; P -> p(P, fun ident/1) end. +list_op() -> + list_op('_'). + +list_op(VHost) -> + list0_op(VHost, fun ident/1). + +list_formatted_op(VHost) -> + order_policies(list0_op(VHost, fun format/1)). + +list_formatted_op(VHost, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map(AggregatorPid, Ref, + fun(P) -> P end, list_formatted_op(VHost)). + +list0_op(VHost, DefnFun) -> + [p(P, DefnFun) + || P <- rabbit_runtime_parameters:list(VHost, <<"operator_policy">>)]. + + list() -> list('_'). @@ -194,8 +298,7 @@ list0(VHost, DefnFun) -> [p(P, DefnFun) || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)]. order_policies(PropList) -> - lists:sort(fun (A, B) -> pget(priority, A) < pget(priority, B) end, - PropList). + lists:sort(fun (A, B) -> not sort_pred(A, B) end, PropList). p(Parameter, DefnFun) -> Value = pget(value, Parameter), @@ -218,14 +321,23 @@ info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority]. validate(_VHost, <<"policy">>, Name, Term, _User) -> rabbit_parameter_validation:proplist( - Name, policy_validation(), Term). + Name, policy_validation(), Term); +validate(_VHost, <<"operator_policy">>, Name, Term, _User) -> + rabbit_parameter_validation:proplist( + Name, operator_policy_validation(), Term). notify(VHost, <<"policy">>, Name, Term) -> rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]), + update_policies(VHost); +notify(VHost, <<"operator_policy">>, Name, Term) -> + rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]), update_policies(VHost). notify_clear(VHost, <<"policy">>, Name) -> rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}]), + update_policies(VHost); +notify_clear(VHost, <<"operator_policy">>, Name) -> + rabbit_event:notify(operator_policy_cleared, [{name, Name}, {vhost, VHost}]), update_policies(VHost). %%---------------------------------------------------------------------------- @@ -239,50 +351,64 @@ update_policies(VHost) -> Tabs = [rabbit_queue, rabbit_durable_queue, rabbit_exchange, rabbit_durable_exchange], {Xs, Qs} = rabbit_misc:execute_mnesia_transaction( - fun() -> - [mnesia:lock({table, T}, write) || T <- Tabs], %% [1] - case catch list(VHost) of - {'EXIT', {throw, {error, {no_such_vhost, _}}}} -> - {[], []}; %% [2] - {'EXIT', Exit} -> - exit(Exit); - Policies -> - {[update_exchange(X, Policies) || - X <- rabbit_exchange:list(VHost)], - [update_queue(Q, Policies) || - Q <- rabbit_amqqueue:list(VHost)]} - end - end), + fun() -> + [mnesia:lock({table, T}, write) || T <- Tabs], %% [1] + case catch {list(VHost), list_op(VHost)} of + {'EXIT', {throw, {error, {no_such_vhost, _}}}} -> + {[], []}; %% [2] + {'EXIT', Exit} -> + exit(Exit); + {Policies, OpPolicies} -> + {[update_exchange(X, Policies, OpPolicies) || + X <- rabbit_exchange:list(VHost)], + [update_queue(Q, Policies, OpPolicies) || + Q <- rabbit_amqqueue:list(VHost)]} + end + end), [catch notify(X) || X <- Xs], [catch notify(Q) || Q <- Qs], ok. -update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> - case match(XName, Policies) of - OldPolicy -> no_change; - NewPolicy -> case rabbit_exchange:update( - XName, fun (X0) -> - rabbit_exchange_decorator:set( - X0 #exchange{policy = NewPolicy}) - end) of - #exchange{} = X1 -> {X, X1}; - not_found -> {X, X } - end +update_exchange(X = #exchange{name = XName, + policy = OldPolicy, + operator_policy = OldOpPolicy}, + Policies, OpPolicies) -> + case {match(XName, Policies), match(XName, OpPolicies)} of + {OldPolicy, OldOpPolicy} -> no_change; + {NewPolicy, NewOpPolicy} -> + NewExchange = rabbit_exchange:update( + XName, + fun(X0) -> + rabbit_exchange_decorator:set( + X0 #exchange{policy = NewPolicy, + operator_policy = NewOpPolicy}) + end), + case NewExchange of + #exchange{} = X1 -> {X, X1}; + not_found -> {X, X } + end end. -update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> - case match(QName, Policies) of - OldPolicy -> no_change; - NewPolicy -> case rabbit_amqqueue:update( - QName, fun(Q1) -> - rabbit_queue_decorator:set( - Q1#amqqueue{policy = NewPolicy, - policy_version = - Q1#amqqueue.policy_version + 1 }) - end) of - #amqqueue{} = Q1 -> {Q, Q1}; - not_found -> {Q, Q } - end +update_queue(Q = #amqqueue{name = QName, + policy = OldPolicy, + operator_policy = OldOpPolicy}, + Policies, OpPolicies) -> + case {match(QName, Policies), match(QName, OpPolicies)} of + {OldPolicy, OldOpPolicy} -> no_change; + {NewPolicy, NewOpPolicy} -> + NewQueue = rabbit_amqqueue:update( + QName, + fun(Q1) -> + rabbit_queue_decorator:set( + Q1#amqqueue{policy = NewPolicy, + operator_policy = NewOpPolicy, + policy_version = + Q1#amqqueue.policy_version + 1 }) + end), + case NewQueue of + #amqqueue{} = Q1 -> {Q, Q1}; + not_found -> {Q, Q } + end end. notify(no_change)-> @@ -293,11 +419,14 @@ notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) -> rabbit_amqqueue:policy_changed(Q1, Q2). match(Name, Policies) -> - case lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]) of - [] -> undefined; - [Policy | _Rest] -> Policy + case match_all(Name, Policies) of + [] -> undefined; + [Policy | _] -> Policy end. +match_all(Name, Policies) -> + lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]). + matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) -> matches_type(Kind, pget('apply-to', Policy)) andalso match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso @@ -313,17 +442,29 @@ sort_pred(A, B) -> pget(priority, A) >= pget(priority, B). %%---------------------------------------------------------------------------- +operator_policy_validation() -> + [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory}, + {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, + {<<"apply-to">>, fun apply_to_validation/2, optional}, + {<<"definition">>, fun validation_op/2, mandatory}]. + policy_validation() -> [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory}, {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, {<<"apply-to">>, fun apply_to_validation/2, optional}, {<<"definition">>, fun validation/2, mandatory}]. -validation(_Name, []) -> +validation_op(Name, Terms) -> + validation(Name, Terms, operator_policy_validator). + +validation(Name, Terms) -> + validation(Name, Terms, policy_validator). + +validation(_Name, [], _Validator) -> {error, "no policy provided", []}; -validation(_Name, Terms) when is_list(Terms) -> +validation(_Name, Terms, Validator) when is_list(Terms) -> {Keys, Modules} = lists:unzip( - rabbit_registry:lookup_all(policy_validator)), + rabbit_registry:lookup_all(Validator)), [] = dups(Keys), %% ASSERTION Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys), case is_proplist(Terms) of @@ -334,7 +475,7 @@ validation(_Name, Terms) when is_list(Terms) -> end; false -> {error, "definition must be a dictionary: ~p", [Terms]} end; -validation(_Name, Term) -> +validation(_Name, Term, _Validator) -> {error, "parse error while reading policy: ~p", [Term]}. validation0(Validators, Terms) -> diff --git a/src/rabbit_policy_merge_strategy.erl b/src/rabbit_policy_merge_strategy.erl new file mode 100644 index 0000000000..55ad87ccac --- /dev/null +++ b/src/rabbit_policy_merge_strategy.erl @@ -0,0 +1,28 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_policy_merge_strategy). + +-behaviour(rabbit_registry_class). + +-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]). + +-callback merge_policy_value(binary(), Value, Value) -> + Value + when Value :: term(). + +added_to_rabbit_registry(_Type, _ModuleName) -> ok. +removed_from_rabbit_registry(_Type) -> ok. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 3c6f2a4f1f..1b78173ade 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -55,6 +55,7 @@ -rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}). -rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}). -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). +-rabbit_upgrade({operator_policies, mnesia, [slave_pids_pending_shutdown, internal_system_x]}). -rabbit_upgrade({vhost_limits, mnesia, []}). %% ------------------------------------------------------------------- @@ -495,6 +496,37 @@ slave_pids_pending_shutdown(Table) -> sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown]). +operator_policies() -> + ok = exchange_operator_policies(rabbit_exchange), + ok = exchange_operator_policies(rabbit_durable_exchange), + ok = queue_operator_policies(rabbit_queue), + ok = queue_operator_policies(rabbit_durable_queue). + +exchange_operator_policies(Table) -> + transform( + Table, + fun ({exchange, Name, Type, Dur, AutoDel, Internal, + Args, Scratches, Policy, Decorators}) -> + {exchange, Name, Type, Dur, AutoDel, Internal, + Args, Scratches, Policy, undefined, Decorators} + end, + [name, type, durable, auto_delete, internal, arguments, scratches, policy, + operator_policy, decorators]). + +queue_operator_policies(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State, PolicyVersion, SlavePidsPendingShutdown}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, undefined, GmPids, + Decorators, State, PolicyVersion, SlavePidsPendingShutdown} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, recoverable_slaves, policy, operator_policy, + gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown]). + %% Prior to 3.6.0, passwords were hashed using MD5, this populates %% existing records with said default. Users created with 3.6.0+ will %% have internal_user.hashing_algorithm populated by the internal diff --git a/test/policy_SUITE.erl b/test/policy_SUITE.erl new file mode 100644 index 0000000000..0920b25418 --- /dev/null +++ b/test/policy_SUITE.erl @@ -0,0 +1,157 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(policy_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + {group, cluster_size_2} + ]. + +groups() -> + [ + {cluster_size_2, [], [ + policy_ttl, + operator_policy_ttl + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(cluster_size_2, Config) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 2}, + {rmq_nodename_suffix, Suffix} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_client_helpers:setup_steps(), + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_client_helpers:teardown_steps(), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Test cases. +%% ------------------------------------------------------------------- + +policy_ttl(Config) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Q = <<"policy_ttl-queue">>, + rabbit_ct_broker_helpers:set_policy(Config, 0, <<"ttl-policy">>, + <<"policy_ttl-queue">>, <<"all">>, [{<<"message-ttl">>, 20}]), + + declare(Ch, Q), + publish(Ch, Q, lists:seq(1, 20)), + timer:sleep(50), + get_empty(Ch, Q), + delete(Ch, Q), + + rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl-policy">>), + + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), + passed. + +operator_policy_ttl(Config) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Q = <<"policy_ttl-queue">>, + % Operator policy will override + rabbit_ct_broker_helpers:set_policy(Config, 0, <<"ttl-policy">>, + <<"policy_ttl-queue">>, <<"all">>, [{<<"message-ttl">>, 100000}]), + rabbit_ct_broker_helpers:set_operator_policy(Config, 0, <<"ttl-policy-op">>, + <<"policy_ttl-queue">>, <<"all">>, [{<<"message-ttl">>, 1}]), + + declare(Ch, Q), + publish(Ch, Q, lists:seq(1, 50)), + timer:sleep(50), + get_empty(Ch, Q), + delete(Ch, Q), + + rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl-policy">>), + rabbit_ct_broker_helpers:clear_operator_policy(Config, 0, <<"ttl-policy-op">>), + + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), + passed. + +%%---------------------------------------------------------------------------- + + +declare(Ch, Q) -> + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true}). + +delete(Ch, Q) -> + amqp_channel:call(Ch, #'queue.delete'{queue = Q}). + +publish(Ch, Q, Ps) -> + amqp_channel:call(Ch, #'confirm.select'{}), + [publish1(Ch, Q, P) || P <- Ps], + amqp_channel:wait_for_confirms(Ch). + +publish1(Ch, Q, P) -> + amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q}, + #amqp_msg{props = props(P), + payload = erlang:md5(P)}). + +publish1(Ch, Q, P, Pd) -> + amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q}, + #amqp_msg{props = props(P), + payload = Pd}). + +props(undefined) -> #'P_basic'{delivery_mode = 2}; +props(P) -> #'P_basic'{priority = P, + delivery_mode = 2}. + +consume(Ch, Q, Ack) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, + no_ack = Ack =:= no_ack, + consumer_tag = <<"ctag">>}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end. + +get_empty(Ch, Q) -> + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = Q}). + +%%---------------------------------------------------------------------------- |
