diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-05-17 17:52:01 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-05-17 17:52:01 +0100 |
| commit | 25484ae2e09a4ad29520b8e7bed6f55c2e3d595d (patch) | |
| tree | 2839bdceb742a39e00705a6346f1a3aa0ba4cd69 | |
| parent | 01c649e6ef942271d12f6a7620ca2fdbb8c8464f (diff) | |
| download | rabbitmq-server-git-25484ae2e09a4ad29520b8e7bed6f55c2e3d595d.tar.gz | |
First sketch of a policy mechanism.
| -rw-r--r-- | include/rabbit.hrl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 75 | ||||
| -rw-r--r-- | src/rabbit_exchange_decorator.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_exchange_type.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_direct.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_headers.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_invalid.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 202 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 29 |
13 files changed, 361 insertions, 60 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 8de31490c8..e8b4a6232e 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -43,11 +43,11 @@ -record(resource, {virtual_host, kind, name}). -record(exchange, {name, type, durable, auto_delete, internal, arguments, - scratches}). + scratches, policy}). -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, mirror_nodes}). + arguments, pid, slave_pids, mirror_nodes, policy}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c1673504e7..637a7d4d0b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -28,7 +28,7 @@ -export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). --export([store_queue/1]). +-export([update/2, store_queue/1, policy_changed/2]). %% internal @@ -71,6 +71,9 @@ -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). +-spec(update/2 :: + (name(), + fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok'). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | rabbit_types:error('not_found'); @@ -157,6 +160,8 @@ -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). -spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(policy_changed/2 :: + (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -endif. @@ -222,9 +227,10 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> case mnesia:read({rabbit_durable_queue, QueueName}) of - [] -> ok = store_queue(Q), - B = add_default_binding(Q), - fun () -> B(), Q end; + [] -> Q1 = rabbit_policy:set(Q), + ok = store_queue(Q1), + B = add_default_binding(Q1), + fun () -> B(), Q1 end; %% Q exists on stopped node [_] -> rabbit_misc:const(not_found) end; @@ -237,6 +243,19 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> end end). +update(Name, Fun) -> + case mnesia:wread({rabbit_queue, Name}) of + [Q = #amqqueue{durable = Durable}] -> + Q1 = Fun(Q), + ok = mnesia:write(rabbit_queue, Q1, write), + case Durable of + true -> ok = mnesia:write(rabbit_durable_queue, Q1, write); + _ -> ok + end; + [] -> + ok + end. + store_queue(Q = #amqqueue{durable = true}) -> ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = []}, write), ok = mnesia:write(rabbit_queue, Q, write), @@ -245,6 +264,9 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. +policy_changed(_Q1, _Q2) -> + ok. + determine_queue_nodes(Args) -> Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5701efeb99..bbc20a78c4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -197,23 +197,41 @@ declare(Recover, From, State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of - not_found -> {stop, normal, not_found, State}; - Q -> gen_server2:reply(From, {new, Q}), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, - [self()]), - ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, - set_ram_duration_target, [self()]}), - BQS = bq_init(BQ, Q, Recover), - State1 = process_args(State#q{backing_queue_state = BQS}), - rabbit_event:notify(queue_created, - infos(?CREATION_EVENT_KEYS, State1)), - rabbit_event:if_enabled(State1, #q.stats_timer, - fun() -> emit_stats(State1) end), - noreply(State1); - Q1 -> {stop, normal, {existing, Q1}, State} - end. + not_found -> + {stop, normal, not_found, State}; + Q1 -> + case matches(Recover, Q, Q1) of + true -> + gen_server2:reply(From, {new, Q}), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, + set_ram_duration_target, [self()]}), + BQS = bq_init(BQ, Q, Recover), + State1 = process_args(State#q{backing_queue_state = BQS}), + rabbit_event:notify(queue_created, + infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(State1, #q.stats_timer, + fun() -> emit_stats(State1) end), + noreply(State1); + false -> + {stop, normal, {existing, Q1}, State} + end + end. + +matches(true, Q, Q) -> true; +matches(true, Q, Q1) -> false; +matches(false, Q1, Q2) -> + %% i.e. not policy + Q1#amqqueue.name =:= Q2#amqqueue.name andalso + Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso + Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso + Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso + Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso + Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso + Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes. bq_init(BQ, Q, Recover) -> Self = self(), diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 7455c797e3..d87c607d32 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -18,13 +18,13 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, callback/4, declare/6, +-export([recover/0, policy_changed/2, callback/4, declare/6, assert_equivalence/6, assert_args_equivalence/2, check_type/1, lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3, info_keys/0, info/1, info/2, info_all/1, info_all/2, route/2, delete/2]). %% these must be run inside a mnesia tx --export([maybe_auto_delete/1, serial/1, peek_serial/1]). +-export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]). %%---------------------------------------------------------------------------- @@ -40,6 +40,8 @@ -spec(callback/4:: (rabbit_types:exchange(), fun_name(), non_neg_integer() | atom(), [any()]) -> 'ok'). +-spec(policy_changed/2 :: + (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'). -spec(declare/6 :: (name(), type(), boolean(), boolean(), boolean(), rabbit_framing:amqp_table()) @@ -64,6 +66,9 @@ rabbit_types:ok(term()) | rabbit_types:error('not_found')). -spec(update_scratch/3 :: (name(), atom(), fun((any()) -> any())) -> 'ok'). +-spec(update/2 :: + (name(), + fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> 'ok'). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()). -spec(info/2 :: @@ -89,7 +94,8 @@ %%---------------------------------------------------------------------------- --define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]). +-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments, + policy]). recover() -> Xs = rabbit_misc:table_filter( @@ -119,6 +125,8 @@ callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> Module = type_to_module(XType), apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). +policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]). + serialise_events(X = #exchange{type = Type}) -> case [Serialise || M <- decorators(), Serialise <- [M:serialise_events(X)], @@ -140,12 +148,12 @@ decorators() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. declare(XName, Type, Durable, AutoDelete, Internal, Args) -> - X = #exchange{name = XName, - type = Type, - durable = Durable, - auto_delete = AutoDelete, - internal = Internal, - arguments = Args}, + X = rabbit_policy:set(#exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + arguments = Args}), XT = type_to_module(Type), %% We want to upset things if it isn't ok ok = XT:validate(X), @@ -252,29 +260,35 @@ lookup_scratch(Name, App) -> update_scratch(Name, App, Fun) -> rabbit_misc:execute_mnesia_transaction( fun() -> - case mnesia:wread({rabbit_exchange, Name}) of - [X = #exchange{durable = Durable, scratches = Scratches0}] -> - Scratches1 = case Scratches0 of - undefined -> orddict:new(); - _ -> Scratches0 - end, - Scratch = case orddict:find(App, Scratches1) of - {ok, S} -> S; - error -> undefined - end, - Scratches2 = orddict:store(App, Fun(Scratch), Scratches1), - X1 = X#exchange{scratches = Scratches2}, - ok = mnesia:write(rabbit_exchange, X1, write), - case Durable of - true -> ok = mnesia:write(rabbit_durable_exchange, - X1, write); - _ -> ok - end; - [] -> - ok - end + update(Name, + fun(X = #exchange{scratches = Scratches0}) -> + Scratches1 = case Scratches0 of + undefined -> orddict:new(); + _ -> Scratches0 + end, + Scratch = case orddict:find(App, Scratches1) of + {ok, S} -> S; + error -> undefined + end, + Scratches2 = orddict:store( + App, Fun(Scratch), Scratches1), + X#exchange{scratches = Scratches2} + end) end). +update(Name, Fun) -> + case mnesia:wread({rabbit_exchange, Name}) of + [X = #exchange{durable = Durable}] -> + X1 = Fun(X), + ok = mnesia:write(rabbit_exchange, X1, write), + case Durable of + true -> ok = mnesia:write(rabbit_durable_exchange, X1, write); + _ -> ok + end; + [] -> + ok + end. + info_keys() -> ?INFO_KEYS. map(VHostPath, F) -> @@ -290,6 +304,7 @@ i(durable, #exchange{durable = Durable}) -> Durable; i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; i(internal, #exchange{internal = Internal}) -> Internal; i(arguments, #exchange{arguments = Arguments}) -> Arguments; +i(policy, X) -> rabbit_policy:name(X); i(Item, _) -> throw({bad_argument, Item}). info(X = #exchange{}) -> infos(?INFO_KEYS, X). diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index 4fa87485bb..9f30688df9 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -18,6 +18,8 @@ -export([behaviour_info/1]). +%% TODO make this into a modern typed callback + behaviour_info(callbacks) -> [ {description, 0}, @@ -40,7 +42,10 @@ behaviour_info(callbacks) -> {add_binding, 3}, %% called after bindings have been deleted. - {remove_bindings, 3} + {remove_bindings, 3}, + + %% called when the policy attached to this exchange changes. + {policy_changed, 3} ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 1027570c8b..e6470b721e 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -58,6 +58,10 @@ rabbit_framing:amqp_table()) -> 'ok' | rabbit_types:connection_exit(). +%% called when the policy attached to this exchange changes. +-callback policy_changed ( + serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'. + -else. -export([behaviour_info/1]). @@ -65,7 +69,7 @@ behaviour_info(callbacks) -> [{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1}, {create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3}, - {assert_args_equivalence, 2}]; + {assert_args_equivalence, 2}, {policy_changed, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index cdec1cb9f2..9a5665c078 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -43,6 +43,7 @@ route(#exchange{name = Name}, validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index a64f2c2924..d9a2f60fdd 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, add_binding/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -42,6 +42,7 @@ route(#exchange{name = Name}, _Delivery) -> validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 61917d8f4a..516b78e59c 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, add_binding/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -116,6 +116,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl index 82d27960bc..101fe434e9 100644 --- a/src/rabbit_exchange_type_invalid.erl +++ b/src/rabbit_exchange_type_invalid.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). description() -> @@ -40,6 +40,7 @@ route(#exchange{name = Name, type = Type}, _) -> validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 3160fdf46e..644d9acf07 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, add_binding/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -58,6 +58,8 @@ delete(transaction, #exchange{name = X}, _Bs) -> delete(none, _Exchange, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. + add_binding(transaction, _Exchange, Binding) -> internal_add_binding(Binding); add_binding(none, _Exchange, _Binding) -> diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl new file mode 100644 index 0000000000..2697b93ddd --- /dev/null +++ b/src/rabbit_policy.erl @@ -0,0 +1,202 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_policy). + +-behaviour(rabbit_runtime_parameter). + +-include("rabbit.hrl"). + +-import(rabbit_misc, [pget/2]). + +-export([register/0]). +-export([name/1, get/2, set/1]). +-export([validate/3, validate_clear/2, notify/3, notify_clear/2]). + +-rabbit_boot_step({?MODULE, + [{description, "policy parameters"}, + {mfa, {rabbit_policy, register, []}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + +register() -> + rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE). + +name(#amqqueue{policy = Policy}) -> name0(Policy); +name(#exchange{policy = Policy}) -> name0(Policy). + +name0(undefined) -> none; +name0(Policy) -> pget(<<"name">>, Policy). + +set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; +set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. + +set0(Name) -> match(Name, list()). + +get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy); +get(Name, #exchange{policy = Policy}) -> get0(Name, Policy). + +get0(_Name, undefined) -> {error, not_found}; +get0(Name, List) -> case pget(<<"policy">>, List) of + undefined -> {error, not_found}; + Policy -> case pget(Name, Policy) of + undefined -> {error, not_found}; + Value -> {ok, Value} + end + end. + +%%---------------------------------------------------------------------------- + +validate(<<"policy">>, _Name, Term) -> + assert_contents(policy_validation(), Term). + +validate_clear(<<"policy">>, _Name) -> + ok. + +notify(<<"policy">>, _Name, _Term) -> + update_policies(). + +notify_clear(<<"policy">>, _Name) -> + update_policies(). + +%%---------------------------------------------------------------------------- + +list() -> + [[{<<"name">>, pget(key, P)} | pget(value, P)] + || P <- rabbit_runtime_parameters:list(<<"policy">>)]. + +update_policies() -> + Policies = list(), + {Xs, Qs} = rabbit_misc:execute_mnesia_transaction( + fun() -> + {[update_exchange(X, Policies) || + VHost <- rabbit_vhost:list(), + X <- rabbit_exchange:list(VHost)], + [update_queue(Q, Policies) || + VHost <- rabbit_vhost:list(), + Q <- rabbit_amqqueue:list(VHost)]} + end), + [notify(X) || X <- Xs], + [notify(Q) || Q <- Qs], + ok. + +update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> + NewPolicy = match(XName, Policies), + case NewPolicy of + OldPolicy -> no_change; + _ -> rabbit_exchange:update( + XName, fun(X1) -> X1#exchange{policy = NewPolicy} end), + {X, X#exchange{policy = NewPolicy}} + end. + +update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> + NewPolicy = match(QName, Policies), + case NewPolicy of + OldPolicy -> no_change; + _ -> rabbit_amqqueue:update( + QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end), + {Q, Q#amqqueue{policy = NewPolicy}} + end. + +notify(no_change)-> + ok; +notify({X1 = #exchange{}, X2 = #exchange{}}) -> + rabbit_exchange:policy_changed(X1, X2); +notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) -> + rabbit_amqqueue:policy_changed(Q1, Q2). + +match(Name, Policies) -> + case lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]) of + [] -> undefined; + [Policy | _Rest] -> Policy + end. + +matches(#resource{name = Name, virtual_host = VHost}, Policy) -> + Prefix = pget(<<"prefix">>, Policy), + case pget(<<"vhost">>, Policy) of + undefined -> prefix(Prefix, Name); + VHost -> prefix(Prefix, Name); + _ -> false + end. + +prefix(A, B) -> lists:prefix(binary_to_list(A), binary_to_list(B)). + +sort_pred(A, B) -> + R = size(pget(<<"prefix">>, A)) >= size(pget(<<"prefix">>, B)), + case {pget(<<"vhost">>, A), pget(<<"vhost">>, B)} of + {undefined, undefined} -> R; + {undefined, _} -> true; + {_, undefined} -> false; + _ -> R + end. + +%%---------------------------------------------------------------------------- + +policy_validation() -> + [{<<"vhost">>, binary, optional}, + {<<"prefix">>, binary, mandatory}, + {<<"policy">>, list, mandatory}]. + +%% TODO this is mostly duplicated from +%% rabbit_federation_parameters. Sort that out in some way. + +assert_type(Name, {Type, Opts}, Term) -> + assert_type(Name, Type, Term), + case lists:member(Term, Opts) of + true -> ok; + false -> {error, "~s must be one of ~p", [Name, Opts]} + end; + +assert_type(_Name, number, Term) when is_number(Term) -> + ok; + +assert_type(Name, number, Term) -> + {error, "~s should be number, actually was ~p", [Name, Term]}; + +assert_type(_Name, binary, Term) when is_binary(Term) -> + ok; + +assert_type(Name, binary, Term) -> + {error, "~s should be binary, actually was ~p", [Name, Term]}; + +assert_type(_Name, list, Term) when is_list(Term) -> + ok; + +assert_type(Name, list, Term) -> + {error, "~s should be list, actually was ~p", [Name, Term]}. + +assert_contents(Constraints, Term) when is_list(Term) -> + {Results, Remainder} + = lists:foldl( + fun ({Name, Constraint, Needed}, {Results0, Term0}) -> + case {lists:keytake(Name, 1, Term0), Needed} of + {{value, {Name, Value}, Term1}, _} -> + {[assert_type(Name, Constraint, Value) | Results0], + Term1}; + {false, mandatory} -> + {[{error, "Key \"~s\" not found", [Name]} | + Results0], Term0}; + {false, optional} -> + {Results0, Term0} + end + end, {[], Term}, Constraints), + case Remainder of + [] -> Results; + _ -> [{error, "Unrecognised terms ~p", [Remainder]} | Results] + end; + +assert_contents(_Constraints, Term) -> + {error, "Not a list ~p", [Term]}. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 87e560e869..18704807ba 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -38,6 +38,8 @@ -rabbit_upgrade({topic_trie_node, mnesia, []}). -rabbit_upgrade({runtime_parameters, mnesia, []}). -rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}). +-rabbit_upgrade({policy, mnesia, + [exchange_scratches, ha_mirrors]}). %% ------------------------------------------------------------------- @@ -59,6 +61,7 @@ -spec(mirrored_supervisor/0 :: () -> 'ok'). -spec(topic_trie_node/0 :: () -> 'ok'). -spec(runtime_parameters/0 :: () -> 'ok'). +-spec(policy/0 :: () -> 'ok'). -endif. @@ -211,6 +214,32 @@ exchange_scratches(Table) -> end, [name, type, durable, auto_delete, internal, arguments, scratches]). +policy() -> + ok = exchange_policy(rabbit_exchange), + ok = exchange_policy(rabbit_durable_exchange), + ok = queue_policy(rabbit_queue), + ok = queue_policy(rabbit_durable_queue). + +exchange_policy(Table) -> + transform( + Table, + fun ({exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches}) -> + {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches, + undefined} + end, + [name, type, durable, auto_delete, internal, arguments, scratches, + policy]). + +queue_policy(Table) -> + transform( + Table, + fun ({amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes}) -> + {amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes, + undefined} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, + slave_pids, mirror_nodes, policy]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |
