diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-08-25 17:06:55 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-08-25 17:06:55 +0100 |
| commit | 0593afdd05bed1619e197487ee1170a1b95cf6eb (patch) | |
| tree | 3ba123ba304a0900d5d29bcc9f8172fa92281749 | |
| parent | 15116daafe3f1ef1b55427c35b27c51a5cf2592d (diff) | |
| download | rabbitmq-server-git-0593afdd05bed1619e197487ee1170a1b95cf6eb.tar.gz | |
Operator policy as a separate queue field
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 232 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 32 |
5 files changed, 203 insertions, 92 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 710ec6ab2a..2c42e348ca 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -107,6 +107,8 @@ -define(STATISTICS_KEYS, [name, policy, + operator_policy, + effective_policy_definition, exclusive_consumer_pid, exclusive_consumer_tag, messages_ready, @@ -887,6 +889,16 @@ i(policy, #q{q = Q}) -> none -> ''; Policy -> Policy end; +i(operator_policy, #q{q = Q}) -> + case rabbit_policy:name_op(Q) of + none -> ''; + Policy -> Policy + end; +i(effective_policy_definition, #q{q = Q}) -> + case rabbit_policy:effective_definition(Q) of + undefined -> []; + Def -> Def + end; i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) -> diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 13ca805bfd..1ef2518691 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -72,7 +72,10 @@ {set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]}, {clear_policy, [?VHOST_DEF]}, + {set_operator_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]}, + {clear_operator_policy, [?VHOST_DEF]}, {list_policies, [?VHOST_DEF]}, + {list_operator_policies, [?VHOST_DEF]}, {list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF, ?LOCAL_DEF]}, {list_exchanges, [?VHOST_DEF]}, @@ -639,6 +642,14 @@ action(list_policies, Node, [], Opts, Inform, Timeout) -> rabbit_policy:info_keys(), [{timeout, Timeout}]); +action(list_operator_policies, Node, [], Opts, Inform, Timeout) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Listing policies", []), + call_emitter(Node, {rabbit_policy, list_formatted_op, [VHostArg]}, + rabbit_policy:info_keys(), + [{timeout, Timeout}]); + + action(list_vhosts, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing vhosts", []), ArgAtoms = default_if_empty(Args, [name]), @@ -907,6 +918,9 @@ format_info_item([T | _] = Value, IsEscaped) lists:nthtail(2, lists:append( [", " ++ format_info_item(E, IsEscaped) || E <- Value])) ++ "]"; +format_info_item({Key, Value}, IsEscaped) -> + "{" ++ io_lib:format("~p", [Key]) ++ ", " ++ + format_info_item(Value, IsEscaped) ++ "}"; format_info_item(Value, _IsEscaped) -> io_lib:format("~w", [Value]). diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index c7d4c99f37..94bbd36418 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -40,7 +40,10 @@ register() -> {policy_validator, <<"expires">>}, {policy_validator, <<"max-length">>}, {policy_validator, <<"max-length-bytes">>}, - {policy_validator, <<"queue-mode">>}]], + {policy_validator, <<"queue-mode">>}, + {operator_policy_validator, <<"message-ttl">>}, + {operator_policy_validator, <<"max-length">>}, + {operator_policy_validator, <<"max-length-bytes">>}]], ok. validate_policy(Terms) -> diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 59367fb182..89cbdcf9d4 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -38,11 +38,11 @@ -include("rabbit.hrl"). --import(rabbit_misc, [pget/2]). +-import(rabbit_misc, [pget/2, pget/3]). -export([register/0]). -export([invalidate/0, recover/0]). --export([name/1, get/2, get_arg/3, set/1]). +-export([name/1, name_op/1, effective_definition/1, get/2, get_arg/3, set/1]). -export([validate/5, notify/4, notify_clear/3]). -export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1, list_formatted/1, list_formatted/3, info_keys/0]). @@ -56,34 +56,86 @@ {enables, recovery}]}). register() -> - rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE). - -register() -> + rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE), rabbit_registry:register(runtime_parameter, <<"operator_policy">>, ?MODULE). name(#amqqueue{policy = Policy}) -> name0(Policy); name(#exchange{policy = Policy}) -> name0(Policy). +name_op(#amqqueue{operator_policy = Policy}) -> name0(Policy); +name_op(#exchange{operator_policy = Policy}) -> name0(Policy). + name0(undefined) -> none; name0(Policy) -> pget(name, Policy). -set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; -set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. +effective_definition(#amqqueue{policy = Policy, operator_policy = OpPolicy}) -> + effective_definition0(Policy, OpPolicy); +effective_definition(#exchange{policy = Policy, operator_policy = OpPolicy}) -> + effective_definition0(Policy, OpPolicy). + +effective_definition0(undefined, undefined) -> undefined; +effective_definition0(Policy, undefined) -> pget(definition, Policy); +effective_definition0(undefined, OpPolicy) -> pget(definition, OpPolicy); +effective_definition0(Policy, OpPolicy) -> + OpDefinition = pget(definition, OpPolicy, []), + Definition = pget(definition, Policy, []), + {Keys, _} = lists:unzip(Definition), + {OpKeys, _} = lists:unzip(OpDefinition), + lists:map(fun(Key) -> + case {pget(Key, Definition), pget(Key, OpDefinition)} of + {undefined, Val} -> {Key, Val}; + {Val, undefined} -> {Key, Val}; + {Val, OpVal} -> {Key, merge_policy_value(Key, Val, OpVal)} + end + end, + lists:umerge(Keys, OpKeys)). + +set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = match(Name), + operator_policy = match_op(Name)}; +set(X = #exchange{name = Name}) -> X#exchange{policy = match(Name), + operator_policy = match_op(Name)}. + +match(Name = #resource{virtual_host = VHost}) -> + match(Name, list(VHost)). -set0(Name = #resource{virtual_host = VHost}) -> - match(Name, list(VHost), list_op(VHost)). +match_op(Name = #resource{virtual_host = VHost}) -> + match(Name, list_op(VHost)). + +get(Name, #amqqueue{policy = Policy, operator_policy = OpPolicy}) -> + get0(Name, Policy, OpPolicy); +get(Name, #exchange{policy = Policy, operator_policy = OpPolicy}) -> + get0(Name, Policy, OpPolicy); -get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy); -get(Name, #exchange{policy = Policy}) -> get0(Name, Policy); %% Caution - SLOW. get(Name, EntityName = #resource{virtual_host = VHost}) -> - get0(Name, match(EntityName, list(VHost), list_op(VHost))). + get0(Name, + match(EntityName, list(VHost)), + match(EntityName, list_op(VHost))). + +get0(_Name, undefined, undefined) -> undefined; +get0(Name, undefined, OpPolicy) -> pget(Name, pget(definition, OpPolicy, [])); +get0(Name, Policy, undefined) -> pget(Name, pget(definition, Policy, [])); +get0(Name, Policy, OpPolicy) -> + OpDefinition = pget(definition, OpPolicy, []), + Definition = pget(definition, Policy, []), + case {pget(Name, Definition), pget(Name, OpDefinition)} of + {undefined, undefined} -> undefined; + {Val, undefined} -> Val; + {undefined, Val} -> Val; + {Val, OpVal} -> merge_policy_value(Name, Val, OpVal) + end. + +merge_policy_value(Name, PolicyVal, OpVal) -> + case policy_merge_strategy(Name) of + undefined -> PolicyVal; + Strategy -> Strategy(PolicyVal, OpVal) + end. + +policy_merge_strategy(<<"message-ttl">>) -> fun erlang:min/2; +policy_merge_strategy(<<"max-length">>) -> fun erlang:min/2; +policy_merge_strategy(<<"max-length-bytes">>) -> fun erlang:min/2; +policy_merge_strategy(_) -> undefined. -get0(_Name, undefined) -> undefined; -get0(Name, List) -> case pget(definition, List) of - undefined -> undefined; - Policy -> pget(Name, Policy) - end. %% Many heads for optimisation get_arg(_AName, _PName, #exchange{arguments = [], policy = undefined}) -> @@ -124,14 +176,18 @@ recover0() -> mnesia:write( rabbit_durable_exchange, rabbit_exchange_decorator:set( - X#exchange{policy = match(Name, Policies, OpPolicies)}), write) + X#exchange{policy = match(Name, Policies), + operator_policy = match(Name, OpPolicies)}), + write) end) || X = #exchange{name = Name} <- Xs], [rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:write( rabbit_durable_queue, rabbit_queue_decorator:set( - Q#amqqueue{policy = match(Name, Policies, OpPolicies)}), write) + Q#amqqueue{policy = match(Name, Policies), + operator_policy = match(Name, OpPolicies)}), + write) end) || Q = #amqqueue{name = Name} <- Qs], ok. @@ -263,25 +319,21 @@ info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority]. validate(_VHost, <<"policy">>, Name, Term, _User) -> rabbit_parameter_validation:proplist( - Name, policy_validation(), Term). - -notify(VHost, <<"policy">>, Name, Term) -> - rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]), - update_policies(VHost). - -notify_clear(VHost, <<"policy">>, Name) -> - rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}]), - update_policies(VHost). - -% TODO: copy-paste. Check + Name, policy_validation(), Term); validate(_VHost, <<"operator_policy">>, Name, Term, _User) -> rabbit_parameter_validation:proplist( - Name, policy_validation(), Term). + Name, operator_policy_validation(), Term). +notify(VHost, <<"policy">>, Name, Term) -> + rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]), + update_policies(VHost); notify(VHost, <<"operator_policy">>, Name, Term) -> rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]), update_policies(VHost). +notify_clear(VHost, <<"policy">>, Name) -> + rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}]), + update_policies(VHost); notify_clear(VHost, <<"operator_policy">>, Name) -> rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}]), update_policies(VHost). @@ -315,32 +367,46 @@ update_policies(VHost) -> [catch notify(Q) || Q <- Qs], ok. -update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies, OpPolicies) -> - case match(XName, Policies, OpPolicies) of - OldPolicy -> no_change; - NewPolicy -> case rabbit_exchange:update( - XName, fun (X0) -> - rabbit_exchange_decorator:set( - X0 #exchange{policy = NewPolicy}) - end) of - #exchange{} = X1 -> {X, X1}; - not_found -> {X, X } - end +update_exchange(X = #exchange{name = XName, + policy = OldPolicy, + operator_policy = OldOpPolicy}, + Policies, OpPolicies) -> + case {match(XName, Policies), match(XName, OpPolicies)} of + {OldPolicy, OldOpPolicy} -> no_change; + {NewPolicy, NewOpPolicy} -> + NewExchange = rabbit_exchange:update( + XName, + fun(X0) -> + rabbit_exchange_decorator:set( + X0 #exchange{policy = NewPolicy, + operator_policy = NewOpPolicy}) + end), + case NewExchange of + #exchange{} = X1 -> {X, X1}; + not_found -> {X, X } + end end. -update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies, OpPolicies) -> - case match(QName, Policies, OpPolicies) of - OldPolicy -> no_change; - NewPolicy -> case rabbit_amqqueue:update( - QName, fun(Q1) -> - rabbit_queue_decorator:set( - Q1#amqqueue{policy = NewPolicy, - policy_version = - Q1#amqqueue.policy_version + 1 }) - end) of - #amqqueue{} = Q1 -> {Q, Q1}; - not_found -> {Q, Q } - end +update_queue(Q = #amqqueue{name = QName, + policy = OldPolicy, + operator_policy = OldOpPolicy}, + Policies, OpPolicies) -> + case {match(QName, Policies), match(QName, OpPolicies)} of + {OldPolicy, OldOpPolicy} -> no_change; + {NewPolicy, NewOpPolicy} -> + NewQueue = rabbit_amqqueue:update( + QName, + fun(Q1) -> + rabbit_queue_decorator:set( + Q1#amqqueue{policy = NewPolicy, + operator_policy = NewOpPolicy, + policy_version = + Q1#amqqueue.policy_version + 1 }) + end), + case NewQueue of + #amqqueue{} = Q1 -> {Q, Q1}; + not_found -> {Q, Q } + end end. notify(no_change)-> @@ -350,40 +416,12 @@ notify({X1 = #exchange{}, X2 = #exchange{}}) -> notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) -> rabbit_amqqueue:policy_changed(Q1, Q2). -match(Name, Policies, OpPolicies) -> - MatchedPolicies = match_all(Name, Policies), - MatchedOpPolicies = match_all(Name, OpPolicies), - case {MatchedPolicies, MatchedOpPolicies} of - {[], []} -> undefined; - {[Policy | _], []} -> Policy; - {[], [OpPolicy | _]} -> OpPolicy; - {[Policy | _], [OpPolicy | _]} -> merge_op_policy(Policy, OpPolicy) - end. - -merge_op_policy(Policy, OpPolicy) -> - OpDefinition = pget(definition, OpPolicy), - Definition = pget(definition, Policy), - ResultDefinition = lists:map(fun(Key) -> - case {pget(Key, Definition), pget(Key, OpDefinition)} of - {undefined, undefined} -> undefined; - {undefined, Val} -> Val; - {Val, undefined} -> Val; - {Val, OpVal} -> merge_policy_value(Key, Val, OpVal) - end - end), - lists:keyreplace(definition, 1, Policy, {definition, ResultDefinition}). - -merge_policy_value(Name, PolicyVal, OpVal) -> - case policy_merge_strategy(Name) of - undefined -> PolicyVal; - Strategy -> Strategy(PolicyVal, OpVal) +match(Name, Policies) -> + case match_all(Name, Policies) of + [] -> undefined; + [Policy | _] -> Policy end. -policy_merge_strategy(<<"message-ttl">>) -> fun erlang:min/2; -policy_merge_strategy(<<"max-length">>) -> fun erlang:min/2; -policy_merge_strategy(<<"max-length-bytes">>) -> fun erlang:min/2; -policy_merge_strategy(_) -> undefined. - match_all(Name, Policies) -> lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]). @@ -402,17 +440,29 @@ sort_pred(A, B) -> pget(priority, A) >= pget(priority, B). %%---------------------------------------------------------------------------- +operator_policy_validation() -> + [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory}, + {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, + {<<"apply-to">>, fun apply_to_validation/2, optional}, + {<<"definition">>, fun validation_op/2, mandatory}]. + policy_validation() -> [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory}, {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, {<<"apply-to">>, fun apply_to_validation/2, optional}, {<<"definition">>, fun validation/2, mandatory}]. -validation(_Name, []) -> +validation_op(Name, Terms) -> + validation(Name, Terms, operator_policy_validator). + +validation(Name, Terms) -> + validation(Name, Terms, policy_validator). + +validation(_Name, [], _Validator) -> {error, "no policy provided", []}; -validation(_Name, Terms) when is_list(Terms) -> +validation(_Name, Terms, Validator) when is_list(Terms) -> {Keys, Modules} = lists:unzip( - rabbit_registry:lookup_all(policy_validator)), + rabbit_registry:lookup_all(Validator)), [] = dups(Keys), %% ASSERTION Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys), case is_proplist(Terms) of @@ -423,7 +473,7 @@ validation(_Name, Terms) when is_list(Terms) -> end; false -> {error, "definition must be a dictionary: ~p", [Terms]} end; -validation(_Name, Term) -> +validation(_Name, Term, _Validator) -> {error, "parse error while reading policy: ~p", [Term]}. validation0(Validators, Terms) -> diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 3d624752ea..1a2d3fab21 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -55,6 +55,7 @@ -rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}). -rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}). -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). +-rabbit_upgrade({operator_policies, mnesia, [slave_pids_pending_shutdown, internal_system_x]}). %% ------------------------------------------------------------------- @@ -481,6 +482,37 @@ slave_pids_pending_shutdown(Table) -> sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown]). +operator_policies() -> + ok = exchange_operator_policies(rabbit_exchange), + ok = exchange_operator_policies(rabbit_durable_exchange), + ok = queue_operator_policies(rabbit_queue), + ok = queue_operator_policies(rabbit_durable_queue). + +exchange_operator_policies(Table) -> + transform( + Table, + fun ({exchange, Name, Type, Dur, AutoDel, Internal, + Args, Scratches, Policy, Decorators}) -> + {exchange, Name, Type, Dur, AutoDel, Internal, + Args, Scratches, Policy, undefined, Decorators} + end, + [name, type, durable, auto_delete, internal, arguments, scratches, policy, + operator_policy, decorators]). + +queue_operator_policies(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State, PolicyVersion, SlavePidsPendingShutdown}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, undefined, GmPids, + Decorators, State, PolicyVersion, SlavePidsPendingShutdown} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, recoverable_slaves, policy, operator_policy, + gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown]). + %% Prior to 3.6.0, passwords were hashed using MD5, this populates %% existing records with said default. Users created with 3.6.0+ will %% have internal_user.hashing_algorithm populated by the internal |
