summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2016-08-25 17:06:55 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2016-08-25 17:06:55 +0100
commit0593afdd05bed1619e197487ee1170a1b95cf6eb (patch)
tree3ba123ba304a0900d5d29bcc9f8172fa92281749
parent15116daafe3f1ef1b55427c35b27c51a5cf2592d (diff)
downloadrabbitmq-server-git-0593afdd05bed1619e197487ee1170a1b95cf6eb.tar.gz
Operator policy as a separate queue field
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_control_main.erl14
-rw-r--r--src/rabbit_policies.erl5
-rw-r--r--src/rabbit_policy.erl232
-rw-r--r--src/rabbit_upgrade_functions.erl32
5 files changed, 203 insertions, 92 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 710ec6ab2a..2c42e348ca 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -107,6 +107,8 @@
-define(STATISTICS_KEYS,
[name,
policy,
+ operator_policy,
+ effective_policy_definition,
exclusive_consumer_pid,
exclusive_consumer_tag,
messages_ready,
@@ -887,6 +889,16 @@ i(policy, #q{q = Q}) ->
none -> '';
Policy -> Policy
end;
+i(operator_policy, #q{q = Q}) ->
+ case rabbit_policy:name_op(Q) of
+ none -> '';
+ Policy -> Policy
+ end;
+i(effective_policy_definition, #q{q = Q}) ->
+ case rabbit_policy:effective_definition(Q) of
+ undefined -> [];
+ Def -> Def
+ end;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 13ca805bfd..1ef2518691 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -72,7 +72,10 @@
{set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]},
{clear_policy, [?VHOST_DEF]},
+ {set_operator_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]},
+ {clear_operator_policy, [?VHOST_DEF]},
{list_policies, [?VHOST_DEF]},
+ {list_operator_policies, [?VHOST_DEF]},
{list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF, ?LOCAL_DEF]},
{list_exchanges, [?VHOST_DEF]},
@@ -639,6 +642,14 @@ action(list_policies, Node, [], Opts, Inform, Timeout) ->
rabbit_policy:info_keys(),
[{timeout, Timeout}]);
+action(list_operator_policies, Node, [], Opts, Inform, Timeout) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Listing policies", []),
+ call_emitter(Node, {rabbit_policy, list_formatted_op, [VHostArg]},
+ rabbit_policy:info_keys(),
+ [{timeout, Timeout}]);
+
+
action(list_vhosts, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing vhosts", []),
ArgAtoms = default_if_empty(Args, [name]),
@@ -907,6 +918,9 @@ format_info_item([T | _] = Value, IsEscaped)
lists:nthtail(2, lists:append(
[", " ++ format_info_item(E, IsEscaped)
|| E <- Value])) ++ "]";
+format_info_item({Key, Value}, IsEscaped) ->
+ "{" ++ io_lib:format("~p", [Key]) ++ ", " ++
+ format_info_item(Value, IsEscaped) ++ "}";
format_info_item(Value, _IsEscaped) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index c7d4c99f37..94bbd36418 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -40,7 +40,10 @@ register() ->
{policy_validator, <<"expires">>},
{policy_validator, <<"max-length">>},
{policy_validator, <<"max-length-bytes">>},
- {policy_validator, <<"queue-mode">>}]],
+ {policy_validator, <<"queue-mode">>},
+ {operator_policy_validator, <<"message-ttl">>},
+ {operator_policy_validator, <<"max-length">>},
+ {operator_policy_validator, <<"max-length-bytes">>}]],
ok.
validate_policy(Terms) ->
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 59367fb182..89cbdcf9d4 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -38,11 +38,11 @@
-include("rabbit.hrl").
--import(rabbit_misc, [pget/2]).
+-import(rabbit_misc, [pget/2, pget/3]).
-export([register/0]).
-export([invalidate/0, recover/0]).
--export([name/1, get/2, get_arg/3, set/1]).
+-export([name/1, name_op/1, effective_definition/1, get/2, get_arg/3, set/1]).
-export([validate/5, notify/4, notify_clear/3]).
-export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1,
list_formatted/1, list_formatted/3, info_keys/0]).
@@ -56,34 +56,86 @@
{enables, recovery}]}).
register() ->
- rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE).
-
-register() ->
+ rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE),
rabbit_registry:register(runtime_parameter, <<"operator_policy">>, ?MODULE).
name(#amqqueue{policy = Policy}) -> name0(Policy);
name(#exchange{policy = Policy}) -> name0(Policy).
+name_op(#amqqueue{operator_policy = Policy}) -> name0(Policy);
+name_op(#exchange{operator_policy = Policy}) -> name0(Policy).
+
name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
-set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
-set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
+effective_definition(#amqqueue{policy = Policy, operator_policy = OpPolicy}) ->
+ effective_definition0(Policy, OpPolicy);
+effective_definition(#exchange{policy = Policy, operator_policy = OpPolicy}) ->
+ effective_definition0(Policy, OpPolicy).
+
+effective_definition0(undefined, undefined) -> undefined;
+effective_definition0(Policy, undefined) -> pget(definition, Policy);
+effective_definition0(undefined, OpPolicy) -> pget(definition, OpPolicy);
+effective_definition0(Policy, OpPolicy) ->
+ OpDefinition = pget(definition, OpPolicy, []),
+ Definition = pget(definition, Policy, []),
+ {Keys, _} = lists:unzip(Definition),
+ {OpKeys, _} = lists:unzip(OpDefinition),
+ lists:map(fun(Key) ->
+ case {pget(Key, Definition), pget(Key, OpDefinition)} of
+ {undefined, Val} -> {Key, Val};
+ {Val, undefined} -> {Key, Val};
+ {Val, OpVal} -> {Key, merge_policy_value(Key, Val, OpVal)}
+ end
+ end,
+ lists:umerge(Keys, OpKeys)).
+
+set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = match(Name),
+ operator_policy = match_op(Name)};
+set(X = #exchange{name = Name}) -> X#exchange{policy = match(Name),
+ operator_policy = match_op(Name)}.
+
+match(Name = #resource{virtual_host = VHost}) ->
+ match(Name, list(VHost)).
-set0(Name = #resource{virtual_host = VHost}) ->
- match(Name, list(VHost), list_op(VHost)).
+match_op(Name = #resource{virtual_host = VHost}) ->
+ match(Name, list_op(VHost)).
+
+get(Name, #amqqueue{policy = Policy, operator_policy = OpPolicy}) ->
+ get0(Name, Policy, OpPolicy);
+get(Name, #exchange{policy = Policy, operator_policy = OpPolicy}) ->
+ get0(Name, Policy, OpPolicy);
-get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
-get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
%% Caution - SLOW.
get(Name, EntityName = #resource{virtual_host = VHost}) ->
- get0(Name, match(EntityName, list(VHost), list_op(VHost))).
+ get0(Name,
+ match(EntityName, list(VHost)),
+ match(EntityName, list_op(VHost))).
+
+get0(_Name, undefined, undefined) -> undefined;
+get0(Name, undefined, OpPolicy) -> pget(Name, pget(definition, OpPolicy, []));
+get0(Name, Policy, undefined) -> pget(Name, pget(definition, Policy, []));
+get0(Name, Policy, OpPolicy) ->
+ OpDefinition = pget(definition, OpPolicy, []),
+ Definition = pget(definition, Policy, []),
+ case {pget(Name, Definition), pget(Name, OpDefinition)} of
+ {undefined, undefined} -> undefined;
+ {Val, undefined} -> Val;
+ {undefined, Val} -> Val;
+ {Val, OpVal} -> merge_policy_value(Name, Val, OpVal)
+ end.
+
+merge_policy_value(Name, PolicyVal, OpVal) ->
+ case policy_merge_strategy(Name) of
+ undefined -> PolicyVal;
+ Strategy -> Strategy(PolicyVal, OpVal)
+ end.
+
+policy_merge_strategy(<<"message-ttl">>) -> fun erlang:min/2;
+policy_merge_strategy(<<"max-length">>) -> fun erlang:min/2;
+policy_merge_strategy(<<"max-length-bytes">>) -> fun erlang:min/2;
+policy_merge_strategy(_) -> undefined.
-get0(_Name, undefined) -> undefined;
-get0(Name, List) -> case pget(definition, List) of
- undefined -> undefined;
- Policy -> pget(Name, Policy)
- end.
%% Many heads for optimisation
get_arg(_AName, _PName, #exchange{arguments = [], policy = undefined}) ->
@@ -124,14 +176,18 @@ recover0() ->
mnesia:write(
rabbit_durable_exchange,
rabbit_exchange_decorator:set(
- X#exchange{policy = match(Name, Policies, OpPolicies)}), write)
+ X#exchange{policy = match(Name, Policies),
+ operator_policy = match(Name, OpPolicies)}),
+ write)
end) || X = #exchange{name = Name} <- Xs],
[rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:write(
rabbit_durable_queue,
rabbit_queue_decorator:set(
- Q#amqqueue{policy = match(Name, Policies, OpPolicies)}), write)
+ Q#amqqueue{policy = match(Name, Policies),
+ operator_policy = match(Name, OpPolicies)}),
+ write)
end) || Q = #amqqueue{name = Name} <- Qs],
ok.
@@ -263,25 +319,21 @@ info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority].
validate(_VHost, <<"policy">>, Name, Term, _User) ->
rabbit_parameter_validation:proplist(
- Name, policy_validation(), Term).
-
-notify(VHost, <<"policy">>, Name, Term) ->
- rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]),
- update_policies(VHost).
-
-notify_clear(VHost, <<"policy">>, Name) ->
- rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}]),
- update_policies(VHost).
-
-% TODO: copy-paste. Check
+ Name, policy_validation(), Term);
validate(_VHost, <<"operator_policy">>, Name, Term, _User) ->
rabbit_parameter_validation:proplist(
- Name, policy_validation(), Term).
+ Name, operator_policy_validation(), Term).
+notify(VHost, <<"policy">>, Name, Term) ->
+ rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]),
+ update_policies(VHost);
notify(VHost, <<"operator_policy">>, Name, Term) ->
rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]),
update_policies(VHost).
+notify_clear(VHost, <<"policy">>, Name) ->
+ rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}]),
+ update_policies(VHost);
notify_clear(VHost, <<"operator_policy">>, Name) ->
rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}]),
update_policies(VHost).
@@ -315,32 +367,46 @@ update_policies(VHost) ->
[catch notify(Q) || Q <- Qs],
ok.
-update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies, OpPolicies) ->
- case match(XName, Policies, OpPolicies) of
- OldPolicy -> no_change;
- NewPolicy -> case rabbit_exchange:update(
- XName, fun (X0) ->
- rabbit_exchange_decorator:set(
- X0 #exchange{policy = NewPolicy})
- end) of
- #exchange{} = X1 -> {X, X1};
- not_found -> {X, X }
- end
+update_exchange(X = #exchange{name = XName,
+ policy = OldPolicy,
+ operator_policy = OldOpPolicy},
+ Policies, OpPolicies) ->
+ case {match(XName, Policies), match(XName, OpPolicies)} of
+ {OldPolicy, OldOpPolicy} -> no_change;
+ {NewPolicy, NewOpPolicy} ->
+ NewExchange = rabbit_exchange:update(
+ XName,
+ fun(X0) ->
+ rabbit_exchange_decorator:set(
+ X0 #exchange{policy = NewPolicy,
+ operator_policy = NewOpPolicy})
+ end),
+ case NewExchange of
+ #exchange{} = X1 -> {X, X1};
+ not_found -> {X, X }
+ end
end.
-update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies, OpPolicies) ->
- case match(QName, Policies, OpPolicies) of
- OldPolicy -> no_change;
- NewPolicy -> case rabbit_amqqueue:update(
- QName, fun(Q1) ->
- rabbit_queue_decorator:set(
- Q1#amqqueue{policy = NewPolicy,
- policy_version =
- Q1#amqqueue.policy_version + 1 })
- end) of
- #amqqueue{} = Q1 -> {Q, Q1};
- not_found -> {Q, Q }
- end
+update_queue(Q = #amqqueue{name = QName,
+ policy = OldPolicy,
+ operator_policy = OldOpPolicy},
+ Policies, OpPolicies) ->
+ case {match(QName, Policies), match(QName, OpPolicies)} of
+ {OldPolicy, OldOpPolicy} -> no_change;
+ {NewPolicy, NewOpPolicy} ->
+ NewQueue = rabbit_amqqueue:update(
+ QName,
+ fun(Q1) ->
+ rabbit_queue_decorator:set(
+ Q1#amqqueue{policy = NewPolicy,
+ operator_policy = NewOpPolicy,
+ policy_version =
+ Q1#amqqueue.policy_version + 1 })
+ end),
+ case NewQueue of
+ #amqqueue{} = Q1 -> {Q, Q1};
+ not_found -> {Q, Q }
+ end
end.
notify(no_change)->
@@ -350,40 +416,12 @@ notify({X1 = #exchange{}, X2 = #exchange{}}) ->
notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) ->
rabbit_amqqueue:policy_changed(Q1, Q2).
-match(Name, Policies, OpPolicies) ->
- MatchedPolicies = match_all(Name, Policies),
- MatchedOpPolicies = match_all(Name, OpPolicies),
- case {MatchedPolicies, MatchedOpPolicies} of
- {[], []} -> undefined;
- {[Policy | _], []} -> Policy;
- {[], [OpPolicy | _]} -> OpPolicy;
- {[Policy | _], [OpPolicy | _]} -> merge_op_policy(Policy, OpPolicy)
- end.
-
-merge_op_policy(Policy, OpPolicy) ->
- OpDefinition = pget(definition, OpPolicy),
- Definition = pget(definition, Policy),
- ResultDefinition = lists:map(fun(Key) ->
- case {pget(Key, Definition), pget(Key, OpDefinition)} of
- {undefined, undefined} -> undefined;
- {undefined, Val} -> Val;
- {Val, undefined} -> Val;
- {Val, OpVal} -> merge_policy_value(Key, Val, OpVal)
- end
- end),
- lists:keyreplace(definition, 1, Policy, {definition, ResultDefinition}).
-
-merge_policy_value(Name, PolicyVal, OpVal) ->
- case policy_merge_strategy(Name) of
- undefined -> PolicyVal;
- Strategy -> Strategy(PolicyVal, OpVal)
+match(Name, Policies) ->
+ case match_all(Name, Policies) of
+ [] -> undefined;
+ [Policy | _] -> Policy
end.
-policy_merge_strategy(<<"message-ttl">>) -> fun erlang:min/2;
-policy_merge_strategy(<<"max-length">>) -> fun erlang:min/2;
-policy_merge_strategy(<<"max-length-bytes">>) -> fun erlang:min/2;
-policy_merge_strategy(_) -> undefined.
-
match_all(Name, Policies) ->
lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]).
@@ -402,17 +440,29 @@ sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
%%----------------------------------------------------------------------------
+operator_policy_validation() ->
+ [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory},
+ {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
+ {<<"apply-to">>, fun apply_to_validation/2, optional},
+ {<<"definition">>, fun validation_op/2, mandatory}].
+
policy_validation() ->
[{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory},
{<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
{<<"apply-to">>, fun apply_to_validation/2, optional},
{<<"definition">>, fun validation/2, mandatory}].
-validation(_Name, []) ->
+validation_op(Name, Terms) ->
+ validation(Name, Terms, operator_policy_validator).
+
+validation(Name, Terms) ->
+ validation(Name, Terms, policy_validator).
+
+validation(_Name, [], _Validator) ->
{error, "no policy provided", []};
-validation(_Name, Terms) when is_list(Terms) ->
+validation(_Name, Terms, Validator) when is_list(Terms) ->
{Keys, Modules} = lists:unzip(
- rabbit_registry:lookup_all(policy_validator)),
+ rabbit_registry:lookup_all(Validator)),
[] = dups(Keys), %% ASSERTION
Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys),
case is_proplist(Terms) of
@@ -423,7 +473,7 @@ validation(_Name, Terms) when is_list(Terms) ->
end;
false -> {error, "definition must be a dictionary: ~p", [Terms]}
end;
-validation(_Name, Term) ->
+validation(_Name, Term, _Validator) ->
{error, "parse error while reading policy: ~p", [Term]}.
validation0(Validators, Terms) ->
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 3d624752ea..1a2d3fab21 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -55,6 +55,7 @@
-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}).
-rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
+-rabbit_upgrade({operator_policies, mnesia, [slave_pids_pending_shutdown, internal_system_x]}).
%% -------------------------------------------------------------------
@@ -481,6 +482,37 @@ slave_pids_pending_shutdown(Table) ->
sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state,
policy_version, slave_pids_pending_shutdown]).
+operator_policies() ->
+ ok = exchange_operator_policies(rabbit_exchange),
+ ok = exchange_operator_policies(rabbit_durable_exchange),
+ ok = queue_operator_policies(rabbit_queue),
+ ok = queue_operator_policies(rabbit_durable_queue).
+
+exchange_operator_policies(Table) ->
+ transform(
+ Table,
+ fun ({exchange, Name, Type, Dur, AutoDel, Internal,
+ Args, Scratches, Policy, Decorators}) ->
+ {exchange, Name, Type, Dur, AutoDel, Internal,
+ Args, Scratches, Policy, undefined, Decorators}
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches, policy,
+ operator_policy, decorators]).
+
+queue_operator_policies(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State, PolicyVersion, SlavePidsPendingShutdown}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, undefined, GmPids,
+ Decorators, State, PolicyVersion, SlavePidsPendingShutdown}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, operator_policy,
+ gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown]).
+
%% Prior to 3.6.0, passwords were hashed using MD5, this populates
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal