summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-05-17 17:52:01 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-05-17 17:52:01 +0100
commit25484ae2e09a4ad29520b8e7bed6f55c2e3d595d (patch)
tree2839bdceb742a39e00705a6346f1a3aa0ba4cd69
parent01c649e6ef942271d12f6a7620ca2fdbb8c8464f (diff)
downloadrabbitmq-server-git-25484ae2e09a4ad29520b8e7bed6f55c2e3d595d.tar.gz
First sketch of a policy mechanism.
-rw-r--r--include/rabbit.hrl4
-rw-r--r--src/rabbit_amqqueue.erl30
-rw-r--r--src/rabbit_amqqueue_process.erl52
-rw-r--r--src/rabbit_exchange.erl75
-rw-r--r--src/rabbit_exchange_decorator.erl7
-rw-r--r--src/rabbit_exchange_type.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl3
-rw-r--r--src/rabbit_exchange_type_fanout.erl3
-rw-r--r--src/rabbit_exchange_type_headers.erl3
-rw-r--r--src/rabbit_exchange_type_invalid.erl3
-rw-r--r--src/rabbit_exchange_type_topic.erl4
-rw-r--r--src/rabbit_policy.erl202
-rw-r--r--src/rabbit_upgrade_functions.erl29
13 files changed, 361 insertions, 60 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 8de31490c8..e8b4a6232e 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -43,11 +43,11 @@
-record(resource, {virtual_host, kind, name}).
-record(exchange, {name, type, durable, auto_delete, internal, arguments,
- scratches}).
+ scratches, policy}).
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, mirror_nodes}).
+ arguments, pid, slave_pids, mirror_nodes, policy}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c1673504e7..637a7d4d0b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -28,7 +28,7 @@
-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--export([store_queue/1]).
+-export([update/2, store_queue/1, policy_changed/2]).
%% internal
@@ -71,6 +71,9 @@
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
-> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
+-spec(update/2 ::
+ (name(),
+ fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok').
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found');
@@ -157,6 +160,8 @@
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(policy_changed/2 ::
+ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-endif.
@@ -222,9 +227,10 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
case mnesia:read({rabbit_durable_queue, QueueName}) of
- [] -> ok = store_queue(Q),
- B = add_default_binding(Q),
- fun () -> B(), Q end;
+ [] -> Q1 = rabbit_policy:set(Q),
+ ok = store_queue(Q1),
+ B = add_default_binding(Q1),
+ fun () -> B(), Q1 end;
%% Q exists on stopped node
[_] -> rabbit_misc:const(not_found)
end;
@@ -237,6 +243,19 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
end
end).
+update(Name, Fun) ->
+ case mnesia:wread({rabbit_queue, Name}) of
+ [Q = #amqqueue{durable = Durable}] ->
+ Q1 = Fun(Q),
+ ok = mnesia:write(rabbit_queue, Q1, write),
+ case Durable of
+ true -> ok = mnesia:write(rabbit_durable_queue, Q1, write);
+ _ -> ok
+ end;
+ [] ->
+ ok
+ end.
+
store_queue(Q = #amqqueue{durable = true}) ->
ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = []}, write),
ok = mnesia:write(rabbit_queue, Q, write),
@@ -245,6 +264,9 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
+policy_changed(_Q1, _Q2) ->
+ ok.
+
determine_queue_nodes(Args) ->
Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>),
PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5701efeb99..bbc20a78c4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -197,23 +197,41 @@ declare(Recover, From, State = #q{q = Q,
backing_queue = BQ,
backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found -> {stop, normal, not_found, State};
- Q -> gen_server2:reply(From, {new, Q}),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use,
- [self()]),
- ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue,
- set_ram_duration_target, [self()]}),
- BQS = bq_init(BQ, Q, Recover),
- State1 = process_args(State#q{backing_queue_state = BQS}),
- rabbit_event:notify(queue_created,
- infos(?CREATION_EVENT_KEYS, State1)),
- rabbit_event:if_enabled(State1, #q.stats_timer,
- fun() -> emit_stats(State1) end),
- noreply(State1);
- Q1 -> {stop, normal, {existing, Q1}, State}
- end.
+ not_found ->
+ {stop, normal, not_found, State};
+ Q1 ->
+ case matches(Recover, Q, Q1) of
+ true ->
+ gen_server2:reply(From, {new, Q}),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue,
+ set_ram_duration_target, [self()]}),
+ BQS = bq_init(BQ, Q, Recover),
+ State1 = process_args(State#q{backing_queue_state = BQS}),
+ rabbit_event:notify(queue_created,
+ infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(State1, #q.stats_timer,
+ fun() -> emit_stats(State1) end),
+ noreply(State1);
+ false ->
+ {stop, normal, {existing, Q1}, State}
+ end
+ end.
+
+matches(true, Q, Q) -> true;
+matches(true, Q, Q1) -> false;
+matches(false, Q1, Q2) ->
+ %% i.e. not policy
+ Q1#amqqueue.name =:= Q2#amqqueue.name andalso
+ Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
+ Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
+ Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
+ Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
+ Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
+ Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso
+ Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes.
bq_init(BQ, Q, Recover) ->
Self = self(),
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 7455c797e3..d87c607d32 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -18,13 +18,13 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, callback/4, declare/6,
+-export([recover/0, policy_changed/2, callback/4, declare/6,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
route/2, delete/2]).
%% these must be run inside a mnesia tx
--export([maybe_auto_delete/1, serial/1, peek_serial/1]).
+-export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]).
%%----------------------------------------------------------------------------
@@ -40,6 +40,8 @@
-spec(callback/4::
(rabbit_types:exchange(), fun_name(), non_neg_integer() | atom(),
[any()]) -> 'ok').
+-spec(policy_changed/2 ::
+ (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok').
-spec(declare/6 ::
(name(), type(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table())
@@ -64,6 +66,9 @@
rabbit_types:ok(term()) |
rabbit_types:error('not_found')).
-spec(update_scratch/3 :: (name(), atom(), fun((any()) -> any())) -> 'ok').
+-spec(update/2 ::
+ (name(),
+ fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> 'ok').
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
-spec(info/2 ::
@@ -89,7 +94,8 @@
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
+-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
+ policy]).
recover() ->
Xs = rabbit_misc:table_filter(
@@ -119,6 +125,8 @@ callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
+policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
+
serialise_events(X = #exchange{type = Type}) ->
case [Serialise || M <- decorators(),
Serialise <- [M:serialise_events(X)],
@@ -140,12 +148,12 @@ decorators() ->
[M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
- X = #exchange{name = XName,
- type = Type,
- durable = Durable,
- auto_delete = AutoDelete,
- internal = Internal,
- arguments = Args},
+ X = rabbit_policy:set(#exchange{name = XName,
+ type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ arguments = Args}),
XT = type_to_module(Type),
%% We want to upset things if it isn't ok
ok = XT:validate(X),
@@ -252,29 +260,35 @@ lookup_scratch(Name, App) ->
update_scratch(Name, App, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
- case mnesia:wread({rabbit_exchange, Name}) of
- [X = #exchange{durable = Durable, scratches = Scratches0}] ->
- Scratches1 = case Scratches0 of
- undefined -> orddict:new();
- _ -> Scratches0
- end,
- Scratch = case orddict:find(App, Scratches1) of
- {ok, S} -> S;
- error -> undefined
- end,
- Scratches2 = orddict:store(App, Fun(Scratch), Scratches1),
- X1 = X#exchange{scratches = Scratches2},
- ok = mnesia:write(rabbit_exchange, X1, write),
- case Durable of
- true -> ok = mnesia:write(rabbit_durable_exchange,
- X1, write);
- _ -> ok
- end;
- [] ->
- ok
- end
+ update(Name,
+ fun(X = #exchange{scratches = Scratches0}) ->
+ Scratches1 = case Scratches0 of
+ undefined -> orddict:new();
+ _ -> Scratches0
+ end,
+ Scratch = case orddict:find(App, Scratches1) of
+ {ok, S} -> S;
+ error -> undefined
+ end,
+ Scratches2 = orddict:store(
+ App, Fun(Scratch), Scratches1),
+ X#exchange{scratches = Scratches2}
+ end)
end).
+update(Name, Fun) ->
+ case mnesia:wread({rabbit_exchange, Name}) of
+ [X = #exchange{durable = Durable}] ->
+ X1 = Fun(X),
+ ok = mnesia:write(rabbit_exchange, X1, write),
+ case Durable of
+ true -> ok = mnesia:write(rabbit_durable_exchange, X1, write);
+ _ -> ok
+ end;
+ [] ->
+ ok
+ end.
+
info_keys() -> ?INFO_KEYS.
map(VHostPath, F) ->
@@ -290,6 +304,7 @@ i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(internal, #exchange{internal = Internal}) -> Internal;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
+i(policy, X) -> rabbit_policy:name(X);
i(Item, _) -> throw({bad_argument, Item}).
info(X = #exchange{}) -> infos(?INFO_KEYS, X).
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 4fa87485bb..9f30688df9 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -18,6 +18,8 @@
-export([behaviour_info/1]).
+%% TODO make this into a modern typed callback
+
behaviour_info(callbacks) ->
[
{description, 0},
@@ -40,7 +42,10 @@ behaviour_info(callbacks) ->
{add_binding, 3},
%% called after bindings have been deleted.
- {remove_bindings, 3}
+ {remove_bindings, 3},
+
+ %% called when the policy attached to this exchange changes.
+ {policy_changed, 3}
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 1027570c8b..e6470b721e 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -58,6 +58,10 @@
rabbit_framing:amqp_table()) ->
'ok' | rabbit_types:connection_exit().
+%% called when the policy attached to this exchange changes.
+-callback policy_changed (
+ serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
+
-else.
-export([behaviour_info/1]).
@@ -65,7 +69,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1},
{create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3},
- {assert_args_equivalence, 2}];
+ {assert_args_equivalence, 2}, {policy_changed, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index cdec1cb9f2..9a5665c078 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3,
+-export([validate/1, create/2, delete/3, policy_changed/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -43,6 +43,7 @@ route(#exchange{name = Name},
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index a64f2c2924..d9a2f60fdd 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -42,6 +42,7 @@ route(#exchange{name = Name}, _Delivery) ->
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 61917d8f4a..516b78e59c 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -116,6 +116,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index 82d27960bc..101fe434e9 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3,
+-export([validate/1, create/2, delete/3, policy_changed/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
description() ->
@@ -40,6 +40,7 @@ route(#exchange{name = Name, type = Type}, _) ->
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 3160fdf46e..644d9acf07 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -58,6 +58,8 @@ delete(transaction, #exchange{name = X}, _Bs) ->
delete(none, _Exchange, _Bs) ->
ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
+
add_binding(transaction, _Exchange, Binding) ->
internal_add_binding(Binding);
add_binding(none, _Exchange, _Binding) ->
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
new file mode 100644
index 0000000000..2697b93ddd
--- /dev/null
+++ b/src/rabbit_policy.erl
@@ -0,0 +1,202 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_policy).
+
+-behaviour(rabbit_runtime_parameter).
+
+-include("rabbit.hrl").
+
+-import(rabbit_misc, [pget/2]).
+
+-export([register/0]).
+-export([name/1, get/2, set/1]).
+-export([validate/3, validate_clear/2, notify/3, notify_clear/2]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "policy parameters"},
+ {mfa, {rabbit_policy, register, []}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
+register() ->
+ rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE).
+
+name(#amqqueue{policy = Policy}) -> name0(Policy);
+name(#exchange{policy = Policy}) -> name0(Policy).
+
+name0(undefined) -> none;
+name0(Policy) -> pget(<<"name">>, Policy).
+
+set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
+set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
+
+set0(Name) -> match(Name, list()).
+
+get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
+get(Name, #exchange{policy = Policy}) -> get0(Name, Policy).
+
+get0(_Name, undefined) -> {error, not_found};
+get0(Name, List) -> case pget(<<"policy">>, List) of
+ undefined -> {error, not_found};
+ Policy -> case pget(Name, Policy) of
+ undefined -> {error, not_found};
+ Value -> {ok, Value}
+ end
+ end.
+
+%%----------------------------------------------------------------------------
+
+validate(<<"policy">>, _Name, Term) ->
+ assert_contents(policy_validation(), Term).
+
+validate_clear(<<"policy">>, _Name) ->
+ ok.
+
+notify(<<"policy">>, _Name, _Term) ->
+ update_policies().
+
+notify_clear(<<"policy">>, _Name) ->
+ update_policies().
+
+%%----------------------------------------------------------------------------
+
+list() ->
+ [[{<<"name">>, pget(key, P)} | pget(value, P)]
+ || P <- rabbit_runtime_parameters:list(<<"policy">>)].
+
+update_policies() ->
+ Policies = list(),
+ {Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ {[update_exchange(X, Policies) ||
+ VHost <- rabbit_vhost:list(),
+ X <- rabbit_exchange:list(VHost)],
+ [update_queue(Q, Policies) ||
+ VHost <- rabbit_vhost:list(),
+ Q <- rabbit_amqqueue:list(VHost)]}
+ end),
+ [notify(X) || X <- Xs],
+ [notify(Q) || Q <- Qs],
+ ok.
+
+update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
+ NewPolicy = match(XName, Policies),
+ case NewPolicy of
+ OldPolicy -> no_change;
+ _ -> rabbit_exchange:update(
+ XName, fun(X1) -> X1#exchange{policy = NewPolicy} end),
+ {X, X#exchange{policy = NewPolicy}}
+ end.
+
+update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
+ NewPolicy = match(QName, Policies),
+ case NewPolicy of
+ OldPolicy -> no_change;
+ _ -> rabbit_amqqueue:update(
+ QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end),
+ {Q, Q#amqqueue{policy = NewPolicy}}
+ end.
+
+notify(no_change)->
+ ok;
+notify({X1 = #exchange{}, X2 = #exchange{}}) ->
+ rabbit_exchange:policy_changed(X1, X2);
+notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) ->
+ rabbit_amqqueue:policy_changed(Q1, Q2).
+
+match(Name, Policies) ->
+ case lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]) of
+ [] -> undefined;
+ [Policy | _Rest] -> Policy
+ end.
+
+matches(#resource{name = Name, virtual_host = VHost}, Policy) ->
+ Prefix = pget(<<"prefix">>, Policy),
+ case pget(<<"vhost">>, Policy) of
+ undefined -> prefix(Prefix, Name);
+ VHost -> prefix(Prefix, Name);
+ _ -> false
+ end.
+
+prefix(A, B) -> lists:prefix(binary_to_list(A), binary_to_list(B)).
+
+sort_pred(A, B) ->
+ R = size(pget(<<"prefix">>, A)) >= size(pget(<<"prefix">>, B)),
+ case {pget(<<"vhost">>, A), pget(<<"vhost">>, B)} of
+ {undefined, undefined} -> R;
+ {undefined, _} -> true;
+ {_, undefined} -> false;
+ _ -> R
+ end.
+
+%%----------------------------------------------------------------------------
+
+policy_validation() ->
+ [{<<"vhost">>, binary, optional},
+ {<<"prefix">>, binary, mandatory},
+ {<<"policy">>, list, mandatory}].
+
+%% TODO this is mostly duplicated from
+%% rabbit_federation_parameters. Sort that out in some way.
+
+assert_type(Name, {Type, Opts}, Term) ->
+ assert_type(Name, Type, Term),
+ case lists:member(Term, Opts) of
+ true -> ok;
+ false -> {error, "~s must be one of ~p", [Name, Opts]}
+ end;
+
+assert_type(_Name, number, Term) when is_number(Term) ->
+ ok;
+
+assert_type(Name, number, Term) ->
+ {error, "~s should be number, actually was ~p", [Name, Term]};
+
+assert_type(_Name, binary, Term) when is_binary(Term) ->
+ ok;
+
+assert_type(Name, binary, Term) ->
+ {error, "~s should be binary, actually was ~p", [Name, Term]};
+
+assert_type(_Name, list, Term) when is_list(Term) ->
+ ok;
+
+assert_type(Name, list, Term) ->
+ {error, "~s should be list, actually was ~p", [Name, Term]}.
+
+assert_contents(Constraints, Term) when is_list(Term) ->
+ {Results, Remainder}
+ = lists:foldl(
+ fun ({Name, Constraint, Needed}, {Results0, Term0}) ->
+ case {lists:keytake(Name, 1, Term0), Needed} of
+ {{value, {Name, Value}, Term1}, _} ->
+ {[assert_type(Name, Constraint, Value) | Results0],
+ Term1};
+ {false, mandatory} ->
+ {[{error, "Key \"~s\" not found", [Name]} |
+ Results0], Term0};
+ {false, optional} ->
+ {Results0, Term0}
+ end
+ end, {[], Term}, Constraints),
+ case Remainder of
+ [] -> Results;
+ _ -> [{error, "Unrecognised terms ~p", [Remainder]} | Results]
+ end;
+
+assert_contents(_Constraints, Term) ->
+ {error, "Not a list ~p", [Term]}.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 87e560e869..18704807ba 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -38,6 +38,8 @@
-rabbit_upgrade({topic_trie_node, mnesia, []}).
-rabbit_upgrade({runtime_parameters, mnesia, []}).
-rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}).
+-rabbit_upgrade({policy, mnesia,
+ [exchange_scratches, ha_mirrors]}).
%% -------------------------------------------------------------------
@@ -59,6 +61,7 @@
-spec(mirrored_supervisor/0 :: () -> 'ok').
-spec(topic_trie_node/0 :: () -> 'ok').
-spec(runtime_parameters/0 :: () -> 'ok').
+-spec(policy/0 :: () -> 'ok').
-endif.
@@ -211,6 +214,32 @@ exchange_scratches(Table) ->
end,
[name, type, durable, auto_delete, internal, arguments, scratches]).
+policy() ->
+ ok = exchange_policy(rabbit_exchange),
+ ok = exchange_policy(rabbit_durable_exchange),
+ ok = queue_policy(rabbit_queue),
+ ok = queue_policy(rabbit_durable_queue).
+
+exchange_policy(Table) ->
+ transform(
+ Table,
+ fun ({exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches}) ->
+ {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches,
+ undefined}
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches,
+ policy]).
+
+queue_policy(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes}) ->
+ {amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes,
+ undefined}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid,
+ slave_pids, mirror_nodes, policy]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->